diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java similarity index 78% rename from driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java rename to driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java index b933f6da5c..2bd03b5d31 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java @@ -20,39 +20,23 @@ import java.util.Arrays; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.net.BoltServerAddress; -public class RoundRobinAddressSet +public class AddressSet { private static final BoltServerAddress[] NONE = {}; - private final AtomicInteger offset = new AtomicInteger(); - private volatile BoltServerAddress[] addresses = NONE; - public int size() - { - return addresses.length; - } + private volatile BoltServerAddress[] addresses = NONE; - public BoltServerAddress next() + public BoltServerAddress[] toArray() { - BoltServerAddress[] addresses = this.addresses; - if ( addresses.length == 0 ) - { - return null; - } - return addresses[next( addresses.length )]; + return addresses; } - int next( int divisor ) + public int size() { - int index = offset.getAndIncrement(); - for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() ) - { - offset.compareAndSet( Integer.MIN_VALUE, index % divisor ); - } - return index % divisor; + return addresses.length; } public synchronized void update( Set addresses, Set removed ) @@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address ) @Override public String toString() { - return "RoundRobinAddressSet=" + Arrays.toString( addresses ); - } - - /** breaking encapsulation in order to perform white-box testing of boundary case */ - void setOffset( int target ) - { - offset.set( target ); + return "AddressSet=" + Arrays.toString( addresses ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 54a80d60d7..36812ad945 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable private final Clock clock; private volatile long expirationTimeout; - private final RoundRobinAddressSet readers; - private final RoundRobinAddressSet writers; - private final RoundRobinAddressSet routers; + private final AddressSet readers; + private final AddressSet writers; + private final AddressSet routers; public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses ) { @@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock ) this.clock = clock; this.expirationTimeout = clock.millis() - 1; - this.readers = new RoundRobinAddressSet(); - this.writers = new RoundRobinAddressSet(); - this.routers = new RoundRobinAddressSet(); + this.readers = new AddressSet(); + this.writers = new AddressSet(); + this.routers = new AddressSet(); } @Override @@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address ) } @Override - public RoundRobinAddressSet readers() + public AddressSet readers() { return readers; } @Override - public RoundRobinAddressSet writers() + public AddressSet writers() { return writers; } @Override - public BoltServerAddress nextRouter() + public AddressSet routers() { - return routers.next(); - } - - @Override - public int routerSize() - { - return routers.size(); + return routers; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index ef48041d29..b46fba3111 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -39,6 +39,7 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au private final ConnectionPool connections; private final RoutingTable routingTable; private final Rediscovery rediscovery; + private final LoadBalancingStrategy loadBalancingStrategy; private final Logger log; public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections, @@ -59,6 +60,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, this.connections = connections; this.routingTable = routingTable; this.rediscovery = rediscovery; + this.loadBalancingStrategy = new RoundRobinLoadBalancingStrategy(); this.log = log; refreshRoutingTable(); @@ -67,7 +69,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, @Override public PooledConnection acquireConnection( AccessMode mode ) { - RoundRobinAddressSet addressSet = addressSetFor( mode ); + AddressSet addressSet = addressSetFor( mode ); PooledConnection connection = acquireConnection( mode, addressSet ); return new RoutingPooledConnection( connection, this, mode ); } @@ -90,10 +92,10 @@ public void close() throws Exception connections.close(); } - private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers ) + private PooledConnection acquireConnection( AccessMode mode, AddressSet servers ) { ensureRouting( mode ); - for ( BoltServerAddress address; (address = servers.next()) != null; ) + for ( BoltServerAddress address; (address = selectAddress( mode, servers )) != null; ) { try { @@ -141,7 +143,7 @@ synchronized void refreshRoutingTable() log.info( "Refreshed routing information. %s", routingTable ); } - private RoundRobinAddressSet addressSetFor( AccessMode mode ) + private AddressSet addressSetFor( AccessMode mode ) { switch ( mode ) { @@ -150,7 +152,22 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode ) case WRITE: return routingTable.writers(); default: - throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + throw unknownMode( mode ); + } + } + + private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers ) + { + BoltServerAddress[] addresses = servers.toArray(); + + switch ( mode ) + { + case READ: + return loadBalancingStrategy.selectReader( addresses ); + case WRITE: + return loadBalancingStrategy.selectWriter( addresses ); + default: + throw unknownMode( mode ); } } @@ -161,4 +178,9 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R new RoutingProcedureClusterCompositionProvider( clock, log, settings ); return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) ); } + + private static RuntimeException unknownMode( AccessMode mode ) + { + return new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java new file mode 100644 index 0000000000..99f40fcd42 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +public interface LoadBalancingStrategy +{ + BoltServerAddress selectReader( BoltServerAddress[] knownReaders ); + + BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java index a038da4dfc..de4db16771 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java @@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections, Set seenServers ) { - int size = routingTable.routerSize(); - for ( int i = 0; i < size; i++ ) - { - BoltServerAddress address = routingTable.nextRouter(); - if ( address == null ) - { - break; - } + BoltServerAddress[] addresses = routingTable.routers().toArray(); + for ( BoltServerAddress address : addresses ) + { ClusterComposition composition = lookupOnRouter( address, routingTable, connections ); if ( composition != null ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java new file mode 100644 index 0000000000..cc1002a875 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.concurrent.atomic.AtomicInteger; + +public class RoundRobinArrayIndex +{ + private final AtomicInteger offset; + + RoundRobinArrayIndex() + { + this( 0 ); + } + + // only for testing + RoundRobinArrayIndex( int initialOffset ) + { + this.offset = new AtomicInteger( initialOffset ); + } + + public int next( int arrayLength ) + { + if ( arrayLength == 0 ) + { + return -1; + } + + int nextOffset; + while ( (nextOffset = offset.getAndIncrement()) < 0 ) + { + // overflow, try resetting back to zero + offset.compareAndSet( nextOffset + 1, 0 ); + } + return nextOffset % arrayLength; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java new file mode 100644 index 0000000000..35cb127b85 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +public class RoundRobinLoadBalancingStrategy implements LoadBalancingStrategy +{ + private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex(); + private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex(); + + @Override + public BoltServerAddress selectReader( BoltServerAddress[] knownReaders ) + { + return select( knownReaders, readersIndex ); + } + + @Override + public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ) + { + return select( knownWriters, writersIndex ); + } + + private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex roundRobinIndex ) + { + int length = addresses.length; + if ( length == 0 ) + { + return null; + } + int index = roundRobinIndex.next( length ); + return addresses[index]; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 9ec32dddb0..162ab7a51c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -31,13 +31,11 @@ public interface RoutingTable void forget( BoltServerAddress address ); - RoundRobinAddressSet readers(); + AddressSet readers(); - RoundRobinAddressSet writers(); + AddressSet writers(); - BoltServerAddress nextRouter(); - - int routerSize(); + AddressSet routers(); void removeWriter( BoltServerAddress toRemove ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java index 8c6ea1f5a3..920b53a71c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java @@ -808,11 +808,11 @@ public void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws @Test public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Exception { - StubServer router1 = StubServer.start( "acquire_endpoints.script", 9010 ); - StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9007 ); + StubServer router1 = StubServer.start( "discover_servers.script", 9010 ); + StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9001 ); + StubServer router2 = StubServer.start( "acquire_endpoints.script", 9002 ); StubServer brokenWriter2 = StubServer.start( "dead_write_server.script", 9008 ); - StubServer router2 = StubServer.start( "discover_servers.script", 9002 ); - StubServer writer = StubServer.start( "write_server.script", 9001 ); + StubServer writer = StubServer.start( "write_server.script", 9007 ); try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9010" ); Session session = driver.session() ) @@ -827,9 +827,9 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw { assertEquals( 0, router1.exitStatus() ); assertEquals( 0, brokenWriter1.exitStatus() ); - assertEquals( 0, brokenWriter2.exitStatus() ); assertEquals( 0, router2.exitStatus() ); assertEquals( 0, writer.exitStatus() ); + assertEquals( 0, brokenWriter2.exitStatus() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java new file mode 100644 index 0000000000..b8af2751a5 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +import static java.util.Collections.singleton; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class AddressSetTest +{ + @Test + public void shouldPreserveOrderWhenAdding() throws Exception + { + // given + Set servers = addresses( "one", "two", "tre" ); + + AddressSet set = new AddressSet(); + set.update( servers, new HashSet() ); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + + // when + servers.add( new BoltServerAddress( "fyr" ) ); + set.update( servers, new HashSet() ); + + // then + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" ), + new BoltServerAddress( "fyr" )}, set.toArray() ); + } + + @Test + public void shouldPreserveOrderWhenRemoving() throws Exception + { + // given + Set servers = addresses( "one", "two", "tre" ); + AddressSet set = new AddressSet(); + set.update( servers, new HashSet() ); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + + // when + set.remove( new BoltServerAddress( "one" ) ); + + // then + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + } + + @Test + public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception + { + // given + Set servers = addresses( "one", "two", "tre" ); + AddressSet set = new AddressSet(); + set.update( servers, new HashSet() ); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + + // when + servers.remove( new BoltServerAddress( "one" ) ); + set.update( servers, new HashSet() ); + + // then + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + } + + @Test + public void shouldRecordRemovedAddressesWhenUpdating() throws Exception + { + // given + AddressSet set = new AddressSet(); + set.update( addresses( "one", "two", "tre" ), new HashSet() ); + + // when + HashSet removed = new HashSet<>(); + set.update( addresses( "one", "two", "fyr" ), removed ); + + // then + assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed ); + } + + @Test + public void shouldExposeEmptyArrayWhenEmpty() + { + AddressSet addressSet = new AddressSet(); + + BoltServerAddress[] addresses = addressSet.toArray(); + + assertEquals( 0, addresses.length ); + } + + @Test + public void shouldExposeCorrectArray() + { + AddressSet addressSet = new AddressSet(); + addressSet.update( addresses( "one", "two", "tre" ), new HashSet() ); + + BoltServerAddress[] addresses = addressSet.toArray(); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, addresses ); + } + + @Test + public void shouldHaveSizeZeroWhenEmpty() + { + AddressSet addressSet = new AddressSet(); + + assertEquals( 0, addressSet.size() ); + } + + @Test + public void shouldHaveCorrectSize() + { + AddressSet addressSet = new AddressSet(); + addressSet.update( addresses( "one", "two" ), new HashSet() ); + + assertEquals( 2, addressSet.size() ); + } + + private static Set addresses( String... strings ) + { + Set set = new LinkedHashSet<>(); + for ( String string : strings ) + { + set.add( new BoltServerAddress( string ) ); + } + return set; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 0a12c079cf..2766990516 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -27,7 +27,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A; @@ -142,13 +142,7 @@ public void shouldPreserveOrderingOfRouters() routingTable.update( createClusterComposition( routers, EMPTY, EMPTY ) ); - assertEquals( A, routingTable.nextRouter() ); - assertEquals( C, routingTable.nextRouter() ); - assertEquals( D, routingTable.nextRouter() ); - assertEquals( F, routingTable.nextRouter() ); - assertEquals( B, routingTable.nextRouter() ); - assertEquals( E, routingTable.nextRouter() ); - assertEquals( A, routingTable.nextRouter() ); + assertArrayEquals( new BoltServerAddress[]{A, C, D, F, B, E}, routingTable.routers().toArray() ); } @Test @@ -159,12 +153,7 @@ public void shouldPreserveOrderingOfWriters() routingTable.update( createClusterComposition( EMPTY, writers, EMPTY ) ); - assertEquals( D, routingTable.writers().next() ); - assertEquals( F, routingTable.writers().next() ); - assertEquals( A, routingTable.writers().next() ); - assertEquals( C, routingTable.writers().next() ); - assertEquals( E, routingTable.writers().next() ); - assertEquals( D, routingTable.writers().next() ); + assertArrayEquals( new BoltServerAddress[]{D, F, A, C, E}, routingTable.writers().toArray() ); } @Test @@ -175,12 +164,7 @@ public void shouldPreserveOrderingOfReaders() routingTable.update( createClusterComposition( EMPTY, EMPTY, readers ) ); - assertEquals( B, routingTable.readers().next() ); - assertEquals( A, routingTable.readers().next() ); - assertEquals( F, routingTable.readers().next() ); - assertEquals( C, routingTable.readers().next() ); - assertEquals( D, routingTable.readers().next() ); - assertEquals( B, routingTable.readers().next() ); + assertArrayEquals( new BoltServerAddress[]{B, A, F, C, D}, routingTable.readers().toArray() ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 873781b9a9..b787b2356a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -53,7 +53,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -176,9 +175,12 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing @Test public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingSession() { - RoutingTable routingTable = mock( RoutingTable.class, RETURNS_MOCKS ); - ConnectionPool connectionPool = mock( ConnectionPool.class ); BoltServerAddress address = new BoltServerAddress( "host", 42 ); + RoutingTable routingTable = mock( RoutingTable.class ); + AddressSet addressSet = mock( AddressSet.class ); + when( addressSet.toArray() ).thenReturn( new BoltServerAddress[]{address} ); + when( routingTable.writers() ).thenReturn( addressSet ); + ConnectionPool connectionPool = mock( ConnectionPool.class ); PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address ); when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync ); Rediscovery rediscovery = mock( Rediscovery.class ); @@ -225,8 +227,8 @@ public void shouldThrowWhenRediscoveryReturnsNoSuitableServers() RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.isStaleFor( any( AccessMode.class ) ) ).thenReturn( true ); Rediscovery rediscovery = mock( Rediscovery.class ); - when( routingTable.readers() ).thenReturn( new RoundRobinAddressSet() ); - when( routingTable.writers() ).thenReturn( new RoundRobinAddressSet() ); + when( routingTable.readers() ).thenReturn( new AddressSet() ); + when( routingTable.writers() ).thenReturn( new AddressSet() ); LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); @@ -300,11 +302,11 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne when( connPool.acquire( writer ) ).thenReturn( writerConn ); when( connPool.acquire( reader ) ).thenReturn( readConn ); - RoundRobinAddressSet writerAddrs = mock( RoundRobinAddressSet.class ); - when( writerAddrs.next() ).thenReturn( writer ); + AddressSet writerAddrs = mock( AddressSet.class ); + when( writerAddrs.toArray() ).thenReturn( new BoltServerAddress[]{writer} ); - RoundRobinAddressSet readerAddrs = mock( RoundRobinAddressSet.class ); - when( readerAddrs.next() ).thenReturn( reader ); + AddressSet readerAddrs = mock( AddressSet.class ); + when( readerAddrs.toArray() ).thenReturn( new BoltServerAddress[]{reader} ); RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.readers() ).thenReturn( readerAddrs ); @@ -336,7 +338,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) when( routingTable.isStaleFor( mode ) ).thenReturn( true ); when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( new HashSet() ); - RoundRobinAddressSet addresses = new RoundRobinAddressSet(); + AddressSet addresses = new AddressSet(); addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ), new HashSet() ); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index b204335145..33966304db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -23,6 +23,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.mockito.InOrder; import java.util.ArrayList; import java.util.Collection; @@ -49,6 +50,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -451,6 +453,52 @@ public void shouldNotUseInitialRouterTwiceIfRoutingTableContainsIt() } } + public static class KnownRoutersTest + { + @Test + public void shouldProbeAllKnownRoutersInOrder() + { + PooledConnection brokenConnection1 = mock( PooledConnection.class ); + PooledConnection goodConnection = mock( PooledConnection.class ); + PooledConnection brokenConnection2 = mock( PooledConnection.class ); + + ConnectionPool connections = mock( ConnectionPool.class ); + when( connections.acquire( A ) ).thenReturn( brokenConnection1 ); + when( connections.acquire( B ) ).thenReturn( goodConnection ); + when( connections.acquire( C ) ).thenReturn( brokenConnection2 ); + + ClusterCompositionProvider clusterComposition = mock( ClusterCompositionProvider.class ); + when( clusterComposition.getClusterComposition( brokenConnection1 ) ) + .thenThrow( new ServiceUnavailableException( "Can't connect" ) ); + when( clusterComposition.getClusterComposition( goodConnection ) ) + .thenReturn( success( VALID_CLUSTER_COMPOSITION ) ); + when( clusterComposition.getClusterComposition( brokenConnection2 ) ) + .thenThrow( new ServiceUnavailableException( "Can't connect" ) ); + + RoutingTable routingTable = new TestRoutingTable( A, B, C ); + + RoutingSettings settings = new RoutingSettings( 1, 0, null ); + Clock mockedClock = mock( Clock.class ); + Logger mockedLogger = mock( Logger.class ); + + Rediscovery rediscovery = new Rediscovery( A, settings, mockedClock, mockedLogger, clusterComposition, + directMapProvider ); + + ClusterComposition composition1 = rediscovery.lookupClusterComposition( routingTable, connections ); + assertEquals( VALID_CLUSTER_COMPOSITION, composition1 ); + + ClusterComposition composition2 = rediscovery.lookupClusterComposition( routingTable, connections ); + assertEquals( VALID_CLUSTER_COMPOSITION, composition2 ); + + // server A should've been removed after an unsuccessful attempt + InOrder inOrder = inOrder( clusterComposition ); + inOrder.verify( clusterComposition ).getClusterComposition( brokenConnection1 ); + inOrder.verify( clusterComposition, times( 2 ) ).getClusterComposition( goodConnection ); + + verify( clusterComposition, never() ).getClusterComposition( brokenConnection2 ); + } + } + private static ClusterComposition rediscover( ConnectionPool connections, RoutingTable routingTable, ClusterCompositionProvider provider ) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java deleted file mode 100644 index 390ae234ec..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -import org.junit.Test; - -import org.neo4j.driver.internal.net.BoltServerAddress; - -import static java.util.Arrays.asList; -import static java.util.Collections.singleton; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -public class RoundRobinAddressSetTest -{ - @Test - public void shouldReturnNullWhenEmpty() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - // then - assertNull( set.next() ); - } - - @Test - public void shouldReturnRoundRobin() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ), new HashSet() ); - - // when - BoltServerAddress a = set.next(); - BoltServerAddress b = set.next(); - BoltServerAddress c = set.next(); - - // then - assertEquals( a, set.next() ); - assertEquals( b, set.next() ); - assertEquals( c, set.next() ); - assertEquals( a, set.next() ); - assertEquals( b, set.next() ); - assertEquals( c, set.next() ); - assertNotEquals( a, c ); - assertNotEquals( b, a ); - assertNotEquals( c, b ); - } - - @Test - public void shouldPreserveOrderWhenAdding() throws Exception - { - // given - HashSet servers = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet() ); - - List order = new ArrayList<>(); - for ( int i = 3 * 4 + 1; i-- > 0; ) - { - BoltServerAddress server = set.next(); - if ( !order.contains( server ) ) - { - order.add( server ); - } - } - assertEquals( 3, order.size() ); - - // when - servers.add( new BoltServerAddress( "fyr" ) ); - set.update( servers, new HashSet() ); - - // then - assertEquals( order.get( 1 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - BoltServerAddress next = set.next(); - assertNotEquals( order.get( 0 ), next ); - assertNotEquals( order.get( 1 ), next ); - assertNotEquals( order.get( 2 ), next ); - assertEquals( order.get( 0 ), set.next() ); - // ... and once more - assertEquals( order.get( 1 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - assertEquals( next, set.next() ); - assertEquals( order.get( 0 ), set.next() ); - } - - @Test - public void shouldPreserveOrderWhenRemoving() throws Exception - { - // given - HashSet servers = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet() ); - - List order = new ArrayList<>(); - for ( int i = 3 * 2 + 1; i-- > 0; ) - { - BoltServerAddress server = set.next(); - if ( !order.contains( server ) ) - { - order.add( server ); - } - } - assertEquals( 3, order.size() ); - - // when - set.remove( order.get( 1 ) ); - - // then - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - } - - @Test - public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception - { - // given - HashSet servers = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet() ); - - List order = new ArrayList<>(); - for ( int i = 3 * 2 + 1; i-- > 0; ) - { - BoltServerAddress server = set.next(); - if ( !order.contains( server ) ) - { - order.add( server ); - } - } - assertEquals( 3, order.size() ); - - // when - servers.remove( order.get( 1 ) ); - set.update( servers, new HashSet() ); - - // then - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - } - - @Test - public void shouldRecordRemovedAddressesWhenUpdating() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( - new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ), - new HashSet() ); - - // when - HashSet removed = new HashSet<>(); - set.update( - new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "fyr" ) ) ), - removed ); - - // then - assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed ); - } - - @Test - public void shouldPreserveOrderEvenWhenIntegerOverflows() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - for ( int div = 1; div <= 1024; div++ ) - { - // when - white box testing! - set.setOffset( Integer.MAX_VALUE - 1 ); - int a = set.next( div ); - int b = set.next( div ); - - // then - if ( b != (a + 1) % div ) - { - fail( String.format( "a=%d, b=%d, div=%d, (a+1)%%div=%d", a, b, div, (a + 1) % div ) ); - } - } - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java new file mode 100644 index 0000000000..a91b678ca4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class RoundRobinArrayIndexTest +{ + @Test + public void shouldHandleZeroLength() + { + RoundRobinArrayIndex roundRobinIndex = new RoundRobinArrayIndex(); + + int index = roundRobinIndex.next( 0 ); + + assertEquals( -1, index ); + } + + @Test + public void shouldReturnIndexesInRoundRobinOrder() + { + RoundRobinArrayIndex roundRobinIndex = new RoundRobinArrayIndex(); + + for ( int i = 0; i < 10; i++ ) + { + int index = roundRobinIndex.next( 10 ); + assertEquals( i, index ); + } + + for ( int i = 0; i < 5; i++ ) + { + int index = roundRobinIndex.next( 5 ); + assertEquals( i, index ); + } + } + + @Test + public void shouldHandleOverflow() + { + int arrayLength = 10; + RoundRobinArrayIndex roundRobinIndex = new RoundRobinArrayIndex( Integer.MAX_VALUE - 1 ); + + assertEquals( (Integer.MAX_VALUE - 1) % arrayLength, roundRobinIndex.next( arrayLength ) ); + assertEquals( Integer.MAX_VALUE % arrayLength, roundRobinIndex.next( arrayLength ) ); + assertEquals( 0, roundRobinIndex.next( arrayLength ) ); + assertEquals( 1, roundRobinIndex.next( arrayLength ) ); + assertEquals( 2, roundRobinIndex.next( arrayLength ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java new file mode 100644 index 0000000000..63ef0ad8e4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RoundRobinLoadBalancingStrategyTest +{ + private final RoundRobinLoadBalancingStrategy strategy = new RoundRobinLoadBalancingStrategy(); + + @Test + public void shouldHandleEmptyReadersArray() + { + assertNull( strategy.selectReader( new BoltServerAddress[0] ) ); + } + + @Test + public void shouldHandleEmptyWritersArray() + { + assertNull( strategy.selectWriter( new BoltServerAddress[0] ) ); + } + + @Test + public void shouldHandleSingleReader() + { + BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); + + assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); + } + + @Test + public void shouldHandleSingleWriter() + { + BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); + + assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); + } + + @Test + public void shouldReturnReadersInRoundRobinOrder() + { + BoltServerAddress address1 = new BoltServerAddress( "server-1", 1 ); + BoltServerAddress address2 = new BoltServerAddress( "server-2", 2 ); + BoltServerAddress address3 = new BoltServerAddress( "server-3", 3 ); + BoltServerAddress address4 = new BoltServerAddress( "server-4", 4 ); + + BoltServerAddress[] readers = {address1, address2, address3, address4}; + + assertEquals( address1, strategy.selectReader( readers ) ); + assertEquals( address2, strategy.selectReader( readers ) ); + assertEquals( address3, strategy.selectReader( readers ) ); + assertEquals( address4, strategy.selectReader( readers ) ); + + assertEquals( address1, strategy.selectReader( readers ) ); + assertEquals( address2, strategy.selectReader( readers ) ); + assertEquals( address3, strategy.selectReader( readers ) ); + assertEquals( address4, strategy.selectReader( readers ) ); + } + + @Test + public void shouldReturnWriterInRoundRobinOrder() + { + BoltServerAddress address1 = new BoltServerAddress( "server-1", 1 ); + BoltServerAddress address2 = new BoltServerAddress( "server-2", 2 ); + BoltServerAddress address3 = new BoltServerAddress( "server-3", 3 ); + + BoltServerAddress[] writers = {address1, address2, address3}; + + assertEquals( address1, strategy.selectWriter( writers ) ); + assertEquals( address2, strategy.selectWriter( writers ) ); + assertEquals( address3, strategy.selectWriter( writers ) ); + + assertEquals( address1, strategy.selectWriter( writers ) ); + assertEquals( address2, strategy.selectWriter( writers ) ); + assertEquals( address3, strategy.selectWriter( writers ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java index 4433673370..a734dae5a1 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -28,8 +28,8 @@ import org.neo4j.driver.internal.InternalDriver; import org.neo4j.driver.internal.SessionFactory; import org.neo4j.driver.internal.SessionFactoryImpl; +import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.LoadBalancer; -import org.neo4j.driver.internal.cluster.RoundRobinAddressSet; import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.ConnectionProvider; @@ -48,9 +48,11 @@ public static Matcher containsRouter( final BoltServerAddress addr @Override protected boolean matchesSafely( RoutingTable routingTable ) { - for ( int i = 0; i < routingTable.routerSize(); i++ ) + BoltServerAddress[] addresses = routingTable.routers().toArray(); + + for ( BoltServerAddress currentAddress : addresses ) { - if ( routingTable.nextRouter().equals( address ) ) + if ( currentAddress.equals( address ) ) { return true; } @@ -157,11 +159,12 @@ public void describeTo( Description description ) }; } - private static boolean contains( RoundRobinAddressSet set, BoltServerAddress address ) + private static boolean contains( AddressSet set, BoltServerAddress address ) { - for ( int i = 0; i < set.size(); i++ ) + BoltServerAddress[] addresses = set.toArray(); + for ( BoltServerAddress currentAddress : addresses ) { - if ( set.next().equals( address ) ) + if ( currentAddress.equals( address ) ) { return true; }