diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java index 3ea62e9636..0cea5bc596 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java @@ -19,71 +19,15 @@ package org.neo4j.driver.internal.cluster; import java.util.HashSet; -import java.util.List; import java.util.Set; -import org.neo4j.driver.internal.NetworkSession; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Record; -import org.neo4j.driver.v1.Statement; -import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Value; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.exceptions.value.ValueException; import org.neo4j.driver.v1.util.Function; final class ClusterComposition { - interface Provider - { - String GET_SERVERS = "CALL dbms.cluster.routing.getServers"; - - ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException; - - final class Default implements Provider - { - private static final Statement GET_SERVER = new Statement( Provider.GET_SERVERS ); - private final Clock clock; - private final Logger log; - - Default( Clock clock, Logger log ) - { - this.clock = clock; - this.log = log; - } - - @Override - public ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException - { - StatementResult cursor = getServers( connection ); - List records = cursor.list(); - log.info( "Got getServers response: %s", records ); - long now = clock.millis(); - try - { - if ( records.size() != 1 ) - { - // server returned too few or too many rows, this is a contract violation, treat as incapable - return null; - } - return read( records.get( 0 ), now ); - } - finally - { - cursor.consume(); // make sure we exhaust the results - } - } - - private StatementResult getServers( Connection connection ) - { - return NetworkSession.run( connection, GET_SERVER ); - } - } - } - private static final long MAX_TTL = Long.MAX_VALUE / 1000L; private static final Function OF_BoltServerAddress = new Function() @@ -94,8 +38,11 @@ public BoltServerAddress apply( Value value ) return new BoltServerAddress( value.asString() ); } }; - private final Set readers, writers, routers; - final long expirationTimestamp; + + private final Set readers; + private final Set writers; + private final Set routers; + private final long expirationTimestamp; private ClusterComposition( long expirationTimestamp ) { @@ -118,9 +65,13 @@ private ClusterComposition( long expirationTimestamp ) this.routers.addAll( routers ); } - public boolean isValid() + public boolean hasWriters() { - return !routers.isEmpty() && !writers.isEmpty(); + return !writers.isEmpty(); + } + public boolean hasRoutersAndReaders() + { + return routers.isEmpty() || readers.isEmpty(); } public Set readers() @@ -138,6 +89,10 @@ public Set routers() return new HashSet<>( routers ); } + public long expirationTimestamp() { + return this.expirationTimestamp; + } + @Override public String toString() { @@ -149,32 +104,25 @@ public String toString() '}'; } - private static ClusterComposition read( Record record, long now ) + public static ClusterComposition parse( Record record, long now ) { if ( record == null ) { return null; } - try + + final ClusterComposition result = new ClusterComposition( expirationTimestamp( now, record ) ); + record.get( "servers" ).asList( new Function() { - final ClusterComposition result; - result = new ClusterComposition( expirationTimestamp( now, record ) ); - record.get( "servers" ).asList( new Function() + @Override + public Void apply( Value value ) { - @Override - public Void apply( Value value ) - { - result.servers( value.get( "role" ).asString() ) - .addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) ); - return null; - } - } ); - return result; - } - catch ( ValueException e ) - { - return null; - } + result.servers( value.get( "role" ).asString() ) + .addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) ); + return null; + } + } ); + return result; } private static long expirationTimestamp( long now, Record record ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java new file mode 100644 index 0000000000..70e0fd901f --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.neo4j.driver.internal.spi.Connection; + +public interface ClusterCompositionProvider +{ + ClusterCompositionResponse getClusterComposition( Connection connection ); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionResponse.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionResponse.java new file mode 100644 index 0000000000..640e449c25 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionResponse.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +public interface ClusterCompositionResponse +{ + ClusterComposition clusterComposition(); + + class Failure implements ClusterCompositionResponse + { + private final RuntimeException error; + + public Failure( RuntimeException t ) + { + this.error = t; + } + + @Override + public ClusterComposition clusterComposition() + { + throw this.error; + } + } + + class Success implements ClusterCompositionResponse + { + private final ClusterComposition cluster; + + public Success( ClusterComposition cluster ) + { + this.cluster = cluster; + } + + @Override + public ClusterComposition clusterComposition() + { + return cluster; + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index a5272fd90d..ea699d47fc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -66,7 +66,7 @@ public boolean isStale() @Override public synchronized Set update( ClusterComposition cluster ) { - expirationTimeout = cluster.expirationTimestamp; + expirationTimeout = cluster.expirationTimestamp(); HashSet removed = new HashSet<>(); readers.update( cluster.readers(), removed ); writers.update( cluster.writers(), removed ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureClusterCompositionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureClusterCompositionProvider.java new file mode 100644 index 0000000000..9d610b2f39 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureClusterCompositionProvider.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.List; + +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ProtocolException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.value.ValueException; + +import static java.lang.String.format; + +public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider +{ + private final String GET_SERVERS = "dbms.cluster.routing.getServers"; + private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + + "' result received from server due to "; + + private final Clock clock; + private final Logger log; + private final GetServersProcedureRunner getServersRunner; + + public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log ) + { + this( clock, log, new GetServersProcedureRunner() ); + } + + GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner ) + { + this.clock = clock; + this.log = log; + this.getServersRunner = getServersRunner; + } + + @Override + public ClusterCompositionResponse getClusterComposition( Connection connection ) + { + List records = null; + + // failed to invoke procedure + try + { + records = getServersRunner.run( connection ); + } + catch ( ClientException e ) + { + return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format( + "Failed to call '%s' procedure on server. " + + "Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) ); + } + + log.info( "Got getServers response: %s", records ); + long now = clock.millis(); + + // the record size is wrong + if ( records.size() != 1 ) + { + return new ClusterCompositionResponse.Failure( new ProtocolException( format( + "%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE, + records.size() ) ) ); + } + + // failed to parse the record + ClusterComposition cluster; + try + { + cluster = ClusterComposition.parse( records.get( 0 ), now ); + } + catch ( ValueException e ) + { + return new ClusterCompositionResponse.Failure( new ProtocolException( format( + "%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) ); + } + + // the cluster result is not a legal reply + if ( cluster.hasRoutersAndReaders() ) + { + return new ClusterCompositionResponse.Failure( new ProtocolException( format( + "%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) ); + } + + // all good + return new ClusterCompositionResponse.Success( cluster ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java new file mode 100644 index 0000000000..244fedc14f --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal.cluster; + +import java.util.List; + +import org.neo4j.driver.internal.NetworkSession; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Statement; + +public class GetServersProcedureRunner +{ + private final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers"; + + public List run( Connection connection ) + { + return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ) ).list(); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index b3414a84ba..3f39469f75 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -26,11 +26,12 @@ import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.lang.String.format; -public final class LoadBalancer implements RoutingErrorHandler, AutoCloseable +public class LoadBalancer implements RoutingErrorHandler, AutoCloseable { private final Logger log; @@ -46,7 +47,7 @@ public LoadBalancer( BoltServerAddress... routingAddresses ) throws ServiceUnavailableException { this( settings, clock, log, connections, new ClusterRoutingTable( clock, routingAddresses ), - new ClusterComposition.Provider.Default( clock, log ) ); + new GetServersProcedureClusterCompositionProvider( clock, log ) ); } private LoadBalancer( @@ -55,7 +56,7 @@ private LoadBalancer( Logger log, ConnectionPool connections, RoutingTable routingTable, - ClusterComposition.Provider provider ) throws ServiceUnavailableException + ClusterCompositionProvider provider ) throws ServiceUnavailableException { this( log, connections, routingTable, new Rediscovery( settings, clock, log, provider ) ); } @@ -132,7 +133,7 @@ private synchronized void forget( BoltServerAddress address ) connections.purge( address ); } - private synchronized void ensureRouting() throws ServiceUnavailableException + synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolException { if ( routingTable.isStale() ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java index 2162c5507d..6a1c7c5c9c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java @@ -34,9 +34,9 @@ public class Rediscovery private final RoutingSettings settings; private final Clock clock; private final Logger logger; - private final ClusterComposition.Provider provider; + private final ClusterCompositionProvider provider; - public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, ClusterComposition.Provider provider ) + public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, ClusterCompositionProvider provider ) { this.settings = settings; this.clock = clock; @@ -47,13 +47,11 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste // Given the current routing table and connection pool, use the connection composition provider to fetch a new // cluster composition, which would be used to update the routing table and connection pool public ClusterComposition lookupRoutingTable( ConnectionPool connections, RoutingTable routingTable ) - throws InterruptedException, ServiceUnavailableException + throws InterruptedException { - int size = routingTable.routerSize(), failures = 0; - if ( size == 0 ) - { - throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ); - } + assertHasRouters( routingTable ); + int failures = 0; + for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) ) { long waitTime = start + delay - clock.millis(); @@ -62,6 +60,8 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin clock.sleep( waitTime ); } start = clock.millis(); + + int size = routingTable.routerSize(); for ( int i = 0; i < size; i++ ) { BoltServerAddress address = routingTable.nextRouter(); @@ -69,28 +69,25 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin { throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ); } - ClusterComposition cluster; + + ClusterCompositionResponse response = null; try ( Connection connection = connections.acquire( address ) ) { - cluster = provider.getClusterComposition( connection ); - logger.info( "Got cluster composition %s", cluster ); + response = provider.getClusterComposition( connection ); } - catch ( Exception e ) + catch ( Throwable e ) { + // the connection breaks logger.error( format( "Failed to connect to routing server '%s'.", address ), e ); - continue; - } - if ( cluster == null || !cluster.isValid() ) - { - logger.info( "Server <%s> unable to perform routing capability, dropping from list of routers.", - address ); routingTable.removeRouter( address ); - if ( --size == 0 ) - { - throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ); - } + + assertHasRouters( routingTable ); + continue; } - else + + ClusterComposition cluster = response.clusterComposition(); + logger.info( "Got cluster composition %s", cluster ); + if ( cluster.hasWriters() ) { return cluster; } @@ -101,4 +98,12 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin } } } + + private void assertHasRouters( RoutingTable table ) + { + if ( table.routerSize() == 0 ) + { + throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/exceptions/ProtocolException.java b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ProtocolException.java new file mode 100644 index 0000000000..4f11a95e89 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/v1/exceptions/ProtocolException.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.exceptions; + +/** + * A signal that the contract for client-server communication has broken down. + * The user should contact support and cannot resolve this his or herself. + */ +public class ProtocolException extends Neo4jException +{ + private static final String CODE = "Protocol violation: "; + + public ProtocolException( String message ) + { + super( CODE + message ); + } + + public ProtocolException( String message, Throwable e ) + { + super( CODE + message, e ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java index 745c12b5bb..4334a9506c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java @@ -42,12 +42,15 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.util.Arrays.asList; import static junit.framework.TestCase.fail; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -55,8 +58,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.cluster.ClusterCompositionProviderTest.serverInfo; -import static org.neo4j.driver.internal.cluster.ClusterCompositionProviderTest.withKeys; -import static org.neo4j.driver.internal.cluster.ClusterCompositionProviderTest.withServerList; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; import static org.neo4j.driver.v1.Values.value; @@ -71,7 +72,7 @@ public class RoutingDriverTest private final Logging logging = EventLogger.provider( events, EventLogger.Level.TRACE ); @Test - public void shouldDoRoutingOnInitialization() + public void shouldDiscoveryOnInitialization() { // Given ConnectionPool pool = poolWithServers( @@ -88,13 +89,13 @@ public void shouldDoRoutingOnInitialization() } @Test - public void shouldDoReRoutingOnSessionAcquisitionIfNecessary() + public void shouldRediscoveryIfNoWritersProvided() { // Given RoutingDriver routingDriver = driverWithPool( pool( withServers( 10, serverInfo( "ROUTE", "localhost:1111" ), - serverInfo( "READ" ), - serverInfo( "WRITE", "localhost:5555" ) ), + serverInfo( "WRITE" ), + serverInfo( "READ", "localhost:5555" ) ), withServers( 10, serverInfo( "ROUTE", "localhost:1112" ), serverInfo( "READ", "localhost:2222" ), serverInfo( "WRITE", "localhost:3333" ) ) ) ); @@ -107,7 +108,7 @@ public void shouldDoReRoutingOnSessionAcquisitionIfNecessary() } @Test - public void shouldNotDoReRoutingOnSessionAcquisitionIfNotNecessary() + public void shouldNotRediscoveryOnSessionAcquisitionIfNotNecessary() { // Given RoutingDriver routingDriver = driverWithPool( pool( @@ -142,7 +143,7 @@ public void shouldFailIfNoRouting() // Then catch ( ServiceUnavailableException e ) { - assertEquals( "Could not perform discovery. No routing servers available.", e.getMessage() ); + assertThat( e.getMessage(), containsString( "Failed to call 'dbms.cluster.routing.getServers' procedure on server" ) ); } } @@ -162,21 +163,21 @@ public void shouldFailIfNoRoutersProvided() driverWithPool( pool ); } // Then - catch ( ServiceUnavailableException e ) + catch ( ProtocolException e ) { - assertEquals( "Could not perform discovery. No routing servers available.", e.getMessage() ); + assertThat( e.getMessage(), containsString( "no router or reader found in response" ) ); } } @Test - public void shouldFailIfNoWritersProvided() + public void shouldFailIfNoReaderProvided() { // Given ConnectionPool pool = poolWithServers( 10, + serverInfo( "READ" ), serverInfo( "ROUTE", "localhost:1111" ), - serverInfo( "READ", "localhost:1111" ), - serverInfo( "WRITE" ) ); + serverInfo( "WRITE", "localhost:1111" ) ); // When try @@ -184,20 +185,20 @@ public void shouldFailIfNoWritersProvided() driverWithPool( pool ); } // Then - catch ( ServiceUnavailableException e ) + catch ( ProtocolException e ) { - assertEquals( "Could not perform discovery. No routing servers available.", e.getMessage() ); + assertThat( e.getMessage(), containsString( "no router or reader found in response" ) ); } } @Test - public void shouldForgetAboutServersOnRerouting() + public void shouldForgetServersOnRediscovery() { // Given ConnectionPool pool = pool( withServers( 10, serverInfo( "ROUTE", "localhost:1111" ), - serverInfo( "READ" ), - serverInfo( "WRITE", "localhost:5555" ) ), + serverInfo( "READ", "localhost:5555" ), + serverInfo( "WRITE" ) ), withServers( 10, serverInfo( "ROUTE", "localhost:1112" ), serverInfo( "READ", "localhost:2222" ), serverInfo( "WRITE", "localhost:3333" ) ) ); @@ -390,4 +391,58 @@ public Connection answer( InvocationOnMock invocationOnMock ) throws Throwable return pool; } + + private static CollectorAnswer withKeys( final String... keys ) + { + return new CollectorAnswer() + { + @Override + void collect( Collector collector ) + { + collector.keys( keys ); + } + }; + } + + private static CollectorAnswer withServerList( final Value[]... records ) + { + return new CollectorAnswer() + { + @Override + void collect( Collector collector ) + { + for ( Value[] fields : records ) + { + collector.record( fields ); + } + } + }; + } + + private static abstract class CollectorAnswer implements Answer + { + abstract void collect( Collector collector ); + + @Override + public final Object answer( InvocationOnMock invocation ) throws Throwable + { + Collector collector = collector( invocation ); + collect( collector ); + collector.done(); + return null; + } + + private Collector collector( InvocationOnMock invocation ) + { + switch ( invocation.getMethod().getName() ) + { + case "pullAll": + return invocation.getArgumentAt( 0, Collector.class ); + case "run": + return invocation.getArgumentAt( 2, Collector.class ); + default: + throw new UnsupportedOperationException( invocation.getMethod().getName() ); + } + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java index 0969396055..ab000c29a4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java @@ -17,217 +17,258 @@ * limitations under the License. */ package org.neo4j.driver.internal.cluster; - import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.mockito.stubbing.Stubber; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.neo4j.driver.internal.EventHandler; +import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.net.pooling.PooledConnection; import org.neo4j.driver.internal.spi.Collector; -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.util.FakeClock; -import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.value.StringValue; +import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; import static org.neo4j.driver.v1.Values.value; public class ClusterCompositionProviderTest { - private final FakeClock clock = new FakeClock( (EventHandler) null, true ); - private final Connection connection = mock( Connection.class ); - @Test - public void shouldParseClusterComposition() throws Exception + public void shouldProtocolErrorWhenNoRecord() throws Throwable { - // given - clock.progress( 16500 ); - keys( "ttl", "servers" ); - values( new Value[] { - value( 100 ), value( asList( - serverInfo( "READ", "one:1337", "two:1337" ), - serverInfo( "WRITE", "one:1337" ), - serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )} ); + // Given + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ), + DEV_NULL_LOGGER, mockedRunner ); - // when - ClusterComposition composition = getClusterComposition(); + PooledConnection mockedConn = mock( PooledConnection.class ); + ArrayList emptyRecord = new ArrayList<>(); + when( mockedRunner.run( mockedConn ) ).thenReturn( emptyRecord ); - // then - assertNotNull( composition ); - assertEquals( 16500 + 100_000, composition.expirationTimestamp ); - assertEquals( serverSet( "one:1337", "two:1337" ), composition.readers() ); - assertEquals( serverSet( "one:1337" ), composition.writers() ); - assertEquals( serverSet( "one:1337", "two:1337" ), composition.routers() ); + // When + ClusterCompositionResponse response = provider.getClusterComposition( mockedConn ); + + // Then + assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) ); + try + { + response.clusterComposition(); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ProtocolException.class ) ); + assertThat( e.getMessage(), containsString( "records received '0' is too few or too many." ) ); + } } @Test - public void shouldReturnNullIfResultContainsTooManyRows() throws Exception + public void shouldProtocolErrorWhenMoreThanOneRecord() throws Throwable { - // given - keys( "ttl", "servers" ); - values( - new Value[] { - value( 100 ), value( singletonList( - serverInfo( "READ", "one:1337", "two:1337" ) ) )}, - new Value[] { - value( 100 ), value( singletonList( - serverInfo( "WRITE", "one:1337" ) ) )}, - new Value[] { - value( 100 ), value( singletonList( - serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )} ); - - // then - assertNull( getClusterComposition() ); + // Given + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ), + DEV_NULL_LOGGER, mockedRunner ); + + PooledConnection mockedConn = mock( PooledConnection.class ); + Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } ); + when( mockedRunner.run( mockedConn ) ).thenReturn( asList( aRecord, aRecord ) ); + + // When + ClusterCompositionResponse response = provider.getClusterComposition( mockedConn ); + + // Then + assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) ); + try + { + response.clusterComposition(); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ProtocolException.class ) ); + assertThat( e.getMessage(), containsString( "records received '2' is too few or too many." ) ); + } } @Test - public void shouldReturnNullOnEmptyResult() throws Exception + public void shouldProtocolErrorWhenUnparsableRecord() throws Throwable { - // given - keys( "ttl", "servers" ); - values(); + // Given + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ), + DEV_NULL_LOGGER, mockedRunner ); + + PooledConnection mockedConn = mock( PooledConnection.class ); + Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } ); + when( mockedRunner.run( mockedConn ) ).thenReturn( asList( aRecord ) ); + + // When + ClusterCompositionResponse response = provider.getClusterComposition( mockedConn ); - // then - assertNull( getClusterComposition() ); + // Then + assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) ); + try + { + response.clusterComposition(); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ProtocolException.class ) ); + assertThat( e.getMessage(), containsString( "unparsable record received." ) ); + } } @Test - public void shouldReturnNullOnResultWithWrongFormat() throws Exception + public void shouldProtocolErrorWhenNoRouters() throws Throwable { - // given - clock.progress( 16500 ); - keys( "ttl", "addresses" ); - values( new Value[] { + // Given + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + Clock mockedClock = mock( Clock.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mockedClock, + DEV_NULL_LOGGER, mockedRunner ); + + PooledConnection mockedConn = mock( PooledConnection.class ); + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ value( 100 ), value( asList( serverInfo( "READ", "one:1337", "two:1337" ), - serverInfo( "WRITE", "one:1337" ), - serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )} ); + serverInfo( "WRITE", "one:1337" ) ) ) + } ); + when( mockedRunner.run( mockedConn ) ).thenReturn( asList( record ) ); + when( mockedClock.millis() ).thenReturn( 12345L ); - // then - assertNull( getClusterComposition() ); - } + // When + ClusterCompositionResponse response = provider.getClusterComposition( mockedConn ); - @Test - public void shouldPropagateConnectionFailureExceptions() throws Exception - { - // given - ServiceUnavailableException expected = new ServiceUnavailableException( "spanish inquisition" ); - onGetServers( doThrow( expected ) ); - - // when + // Then + assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) ); try { - getClusterComposition(); - fail( "Expected exception" ); + response.clusterComposition(); + fail( "Expecting a failure but not triggered." ); } - // then - catch ( ServiceUnavailableException e ) + catch( Exception e ) { - assertSame( expected, e ); + assertThat( e, instanceOf( ProtocolException.class ) ); + assertThat( e.getMessage(), containsString( "no router or reader found in response." ) ); } } - private ClusterComposition getClusterComposition() + @Test + public void shouldProtocolErrorWhenNoReaders() throws Throwable { - return new ClusterComposition.Provider.Default( clock, mock( Logger.class ) ) - .getClusterComposition( connection ); - } + // Given + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + Clock mockedClock = mock( Clock.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mockedClock, + DEV_NULL_LOGGER, mockedRunner ); - private void keys( final String... keys ) - { - onGetServers( doAnswer( withKeys( keys ) ) ); - } + PooledConnection mockedConn = mock( PooledConnection.class ); + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ + value( 100 ), value( asList( + serverInfo( "WRITE", "one:1337" ), + serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) + } ); + when( mockedRunner.run( mockedConn ) ).thenReturn( asList( record ) ); + when( mockedClock.millis() ).thenReturn( 12345L ); - private void values( final Value[]... records ) - { - onPullAll( doAnswer( withServerList( records ) ) ); - } + // When + ClusterCompositionResponse response = provider.getClusterComposition( mockedConn ); - private void onGetServers( Stubber stubber ) - { - stubber.when( connection ).run( - eq( ClusterComposition.Provider.GET_SERVERS ), - eq( Collections.emptyMap() ), - any( Collector.class ) ); + // Then + assertThat( response, instanceOf( ClusterCompositionResponse.Failure.class ) ); + try + { + response.clusterComposition(); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ProtocolException.class ) ); + assertThat( e.getMessage(), containsString( "no router or reader found in response." ) ); + } } - private void onPullAll( Stubber stubber ) - { - stubber.when( connection ).pullAll( any( Collector.class ) ); - } - public static CollectorAnswer withKeys( final String... keys ) + @Test + public void shouldPropagateConnectionFailureExceptions() throws Exception { - return new CollectorAnswer() - { - @Override - void collect( Collector collector ) - { - collector.keys( keys ); - } - }; - } + // Given + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ), + DEV_NULL_LOGGER ); - public static CollectorAnswer withServerList( final Value[]... records ) - { - return new CollectorAnswer() + PooledConnection mockedConn = mock( PooledConnection.class ); + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ + value( 100 ), value( asList( + serverInfo( "WRITE", "one:1337" ), + serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) + } ); + doThrow( new ServiceUnavailableException( "Connection breaks during cypher execution" ) ) + .when( mockedConn ).run( any( String.class ), anyMap(), any( Collector.class ) ); + + // When & Then + try { - @Override - void collect( Collector collector ) - { - for ( Value[] fields : records ) - { - collector.record( fields ); - } - } - }; + provider.getClusterComposition( mockedConn ); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); + assertThat( e.getMessage(), containsString( "Connection breaks during cypher execution" ) ); + } } - private static abstract class CollectorAnswer implements Answer + @Test + public void shouldReturnSuccessResultWhenNoError() throws Throwable { - abstract void collect( Collector collector ); + // Given + Clock mockedClock = mock( Clock.class ); + GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class ); + ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mockedClock, + DEV_NULL_LOGGER, mockedRunner ); - @Override - public final Object answer( InvocationOnMock invocation ) throws Throwable - { - Collector collector = collector( invocation ); - collect( collector ); - collector.done(); - return null; - } + PooledConnection mockedConn = mock( PooledConnection.class ); + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ + value( 100 ), value( asList( + serverInfo( "READ", "one:1337", "two:1337" ), + serverInfo( "WRITE", "one:1337" ), + serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) + } ); + when( mockedRunner.run( mockedConn ) ).thenReturn( asList( record ) ); + when( mockedClock.millis() ).thenReturn( 12345L ); - private Collector collector( InvocationOnMock invocation ) - { - switch ( invocation.getMethod().getName() ) - { - case "pullAll": - return invocation.getArgumentAt( 0, Collector.class ); - case "run": - return invocation.getArgumentAt( 2, Collector.class ); - default: - throw new UnsupportedOperationException( invocation.getMethod().getName() ); - } - } + // When + ClusterCompositionResponse response = provider.getClusterComposition( mockedConn ); + + // Then + assertThat( response, instanceOf( ClusterCompositionResponse.Success.class ) ); + ClusterComposition cluster = response.clusterComposition(); + assertEquals( 12345 + 100_000, cluster.expirationTimestamp() ); + assertEquals( serverSet( "one:1337", "two:1337" ), cluster.readers() ); + assertEquals( serverSet( "one:1337" ), cluster.writers() ); + assertEquals( serverSet( "one:1337", "two:1337" ), cluster.routers() ); } public static Map serverInfo( String role, String... addresses ) @@ -247,4 +288,5 @@ private static Set serverSet( String... addresses ) } return result; } + } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java new file mode 100644 index 0000000000..b8f754c9bb --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.neo4j.driver.internal.cluster; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +import static java.util.Arrays.asList; + +public final class ClusterCompositionUtil +{ + private ClusterCompositionUtil() {} + + public static final long NEVER_EXPIRE = System.currentTimeMillis() + TimeUnit.HOURS.toMillis( 1 ); + + public static final BoltServerAddress A = new BoltServerAddress( "1111:11" ); + public static final BoltServerAddress B = new BoltServerAddress( "2222:22" ); + public static final BoltServerAddress C = new BoltServerAddress( "3333:33" ); + public static final BoltServerAddress D = new BoltServerAddress( "4444:44" ); + public static final BoltServerAddress E = new BoltServerAddress( "5555:55" ); + public static final BoltServerAddress F = new BoltServerAddress( "6666:66" ); + + public static final List EMPTY = new ArrayList<>(); + + public static final ClusterComposition VALID_CLUSTER_COMPOSITION = + createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) ); + + public static final ClusterComposition INVALID_CLUSTER_COMPOSITION = + createClusterComposition( asList( A, B ), EMPTY, asList( D, E ) ); + + @SafeVarargs + public static ClusterComposition createClusterComposition( List... servers ) + { + return createClusterComposition( NEVER_EXPIRE, servers ); + } + + @SafeVarargs + public static ClusterComposition createClusterComposition( long expirationTimestamp, List... + servers ) + { + Set routers = new HashSet<>(); + Set writers = new HashSet<>(); + Set readers = new HashSet<>(); + + switch( servers.length ) + { + case 3: + readers.addAll( servers[2] ); + // no break on purpose + case 2: + writers.addAll( servers[1] ); + // no break on purpose + case 1: + routers.addAll( servers[0] ); + } + return new ClusterComposition( expirationTimestamp, readers, writers, routers ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java new file mode 100644 index 0000000000..ddd93c5849 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import org.neo4j.driver.internal.util.FakeClock; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.B; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.C; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.EMPTY; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition; + +public class ClusterRoutingTableTest +{ + @Test + public void shouldReturnStaleIfTtlExpired() throws Exception + { + // Given + FakeClock clock = new FakeClock(); + RoutingTable routingTable = new ClusterRoutingTable( clock ); + + // When + routingTable.update( createClusterComposition( 1000, + asList( A, B ), asList( C ), asList( D, E ) ) ); + clock.progress( 1234 ); + + // Then + assertTrue( routingTable.isStale() ); + } + + @Test + public void shouldReturnStaleIfNoRouter() throws Exception + { + // Given + FakeClock clock = new FakeClock(); + RoutingTable routingTable = new ClusterRoutingTable( clock ); + + // When + routingTable.update( createClusterComposition( EMPTY, asList( C ), asList( D, E ) ) ); + + // Then + assertTrue( routingTable.isStale() ); + } + + @Test + public void shouldReturnStaleIfNoReader() throws Exception + { + // Given + FakeClock clock = new FakeClock(); + RoutingTable routingTable = new ClusterRoutingTable( clock ); + + // When + routingTable.update( createClusterComposition( asList( A, B ), asList( C ), EMPTY ) ); + + // Then + assertTrue( routingTable.isStale() ); + } + + + @Test + public void shouldReturnStatleIfNoWriter() throws Exception + { + // Given + FakeClock clock = new FakeClock(); + RoutingTable routingTable = new ClusterRoutingTable( clock ); + + // When + routingTable.update( createClusterComposition( asList( A, B ), EMPTY, asList( D, E ) ) ); + + // Then + assertTrue( routingTable.isStale() ); + } + + @Test + public void shouldNotStale() throws Exception + { + // Given + FakeClock clock = new FakeClock(); + RoutingTable routingTable = new ClusterRoutingTable( clock ); + + // When + routingTable.update( createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) ) ); + + // Then + assertFalse( routingTable.isStale() ); + } + + @Test + public void shouldStaleWhenCreate() throws Throwable + { + // Given + FakeClock clock = new FakeClock(); + + // When + RoutingTable routingTable = new ClusterRoutingTable( clock, A ); + + // Then + assertTrue( routingTable.isStale() ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterTopology.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterTopology.java deleted file mode 100644 index 7c201c467b..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterTopology.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.neo4j.driver.internal.Event; -import org.neo4j.driver.internal.EventHandler; -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.util.Clock; - -import static java.util.Arrays.asList; -import static org.neo4j.driver.internal.cluster.ClusterTopology.Role.READ; -import static org.neo4j.driver.internal.cluster.ClusterTopology.Role.ROUTE; -import static org.neo4j.driver.internal.cluster.ClusterTopology.Role.WRITE; - -class ClusterTopology implements ClusterComposition.Provider -{ - public interface EventSink - { - EventSink VOID = new Adapter(); - - void clusterComposition( BoltServerAddress address, ClusterComposition result ); - - class Adapter implements EventSink - { - @Override - public void clusterComposition( BoltServerAddress address, ClusterComposition result ) - { - } - } - } - - private static final List KEYS = Collections.unmodifiableList( asList( "servers", "ttl" ) ); - private final Map views = new HashMap<>(); - private final EventSink events; - private final Clock clock; - - ClusterTopology( final EventHandler events, Clock clock ) - { - this( events == null ? null : new EventSink() - { - @Override - public void clusterComposition( BoltServerAddress address, ClusterComposition result ) - { - events.add( new CompositionRequest( Thread.currentThread(), address, result ) ); - } - }, clock ); - } - - ClusterTopology( EventSink events, Clock clock ) - { - this.events = events == null ? EventSink.VOID : events; - this.clock = clock; - } - - public View on( String host, int port ) - { - View view = new View(); - views.put( new BoltServerAddress( host, port ), view ); - return view; - } - - public enum Role - { - READ, - WRITE, - ROUTE - } - - public static class View - { - private long ttl = 60_000; - private final Set readers = new HashSet<>(), - writers = new HashSet<>(), - routers = new HashSet<>(); - - public View ttlSeconds( long ttl ) - { - this.ttl = ttl * 1000; - return this; - } - - public View provide( String host, int port, Role... roles ) - { - for ( Role role : roles ) - { - servers( role ).add( new BoltServerAddress( host, port ) ); - } - return this; - } - - private Set servers( Role role ) - { - switch ( role ) - { - case READ: - return readers; - case WRITE: - return writers; - case ROUTE: - return routers; - default: - throw new IllegalArgumentException( role.name() ); - } - } - - ClusterComposition composition( long now ) - { - return new ClusterComposition( now + ttl, servers( READ ), servers( WRITE ), servers( ROUTE ) ); - } - } - - @Override - public ClusterComposition getClusterComposition( Connection connection ) - { - BoltServerAddress router = connection.boltServerAddress(); - View view = views.get( router ); - ClusterComposition result = view == null ? null : view.composition( clock.millis() ); - events.clusterComposition( router, result ); - return result; - } - - public static final class CompositionRequest extends Event - { - final Thread thread; - final BoltServerAddress address; - private final ClusterComposition result; - - private CompositionRequest( Thread thread, BoltServerAddress address, ClusterComposition result ) - { - this.thread = thread; - this.address = address; - this.result = result; - } - - @Override - public void dispatch( EventSink sink ) - { - sink.clusterComposition( address, result ); - } - - public static Matcher clusterComposition( - final Matcher thread, - final Matcher address, - final Matcher result ) - { - return new TypeSafeMatcher() - { - @Override - protected boolean matchesSafely( CompositionRequest event ) - { - return thread.matches( event.thread ) - && address.matches( event.address ) - && result.matches( event.result ); - } - - @Override - public void describeTo( Description description ) - { - description.appendText( "a successful cluster composition request on thread <" ) - .appendDescriptionOf( thread ) - .appendText( "> from address <" ) - .appendDescriptionOf( address ) - .appendText( "> returning <" ) - .appendDescriptionOf( result ) - .appendText( ">" ); - } - }; - } - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 06fc733584..84ad2c4ec7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -18,616 +18,120 @@ */ package org.neo4j.driver.internal.cluster; -import org.hamcrest.Matcher; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; +import org.mockito.InOrder; -import java.util.ArrayList; -import java.util.List; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; -import org.neo4j.driver.internal.EventHandler; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StubConnectionPool; -import org.neo4j.driver.internal.util.FakeClock; -import org.neo4j.driver.internal.util.MatcherFactory; -import org.neo4j.driver.v1.EventLogger; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.util.Function; - -import static org.hamcrest.Matchers.any; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.sameInstance; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.neo4j.driver.internal.cluster.ClusterTopology.Role.READ; -import static org.neo4j.driver.internal.cluster.ClusterTopology.Role.ROUTE; -import static org.neo4j.driver.internal.cluster.ClusterTopology.Role.WRITE; -import static org.neo4j.driver.internal.spi.StubConnectionPool.Event.acquire; -import static org.neo4j.driver.internal.spi.StubConnectionPool.Event.connectionFailure; -import static org.neo4j.driver.internal.util.FakeClock.Event.sleep; -import static org.neo4j.driver.internal.util.MatcherFactory.inAnyOrder; -import static org.neo4j.driver.internal.util.MatcherFactory.matches; -import static org.neo4j.driver.v1.EventLogger.Entry.message; -import static org.neo4j.driver.v1.EventLogger.Level.INFO; +import org.neo4j.driver.internal.spi.ConnectionPool; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; public class LoadBalancerTest { - private static final long RETRY_TIMEOUT_DELAY = 5_000; - private static final int MAX_ROUTING_FAILURES = 5; - @Rule - public final TestRule printEventsOnFailure = new TestRule() - { - @Override - public Statement apply( final Statement base, Description description ) - { - return new Statement() - { - @Override - public void evaluate() throws Throwable - { - try - { - base.evaluate(); - } - catch ( Throwable e ) - { - events.printEvents( System.err ); - throw e; - } - } - }; - } - }; - private final EventHandler events = new EventHandler(); - private final FakeClock clock = new FakeClock( events, true ); - private final EventLogger log = new EventLogger( events, null, INFO ); - private final StubConnectionPool connections = new StubConnectionPool( clock, events, null ); - private final ClusterTopology cluster = new ClusterTopology( events, clock ); - - private LoadBalancer seedLoadBalancer( String host, int port ) throws Exception - { - RoutingSettings defaultSettings = new RoutingSettings( MAX_ROUTING_FAILURES, RETRY_TIMEOUT_DELAY ); - return seedLoadBalancer( host, port, defaultSettings ); - } - - private LoadBalancer seedLoadBalancer( String host, int port, RoutingSettings settings ) throws Exception - { - ClusterRoutingTable routingTable = new ClusterRoutingTable( clock, new BoltServerAddress( host, port ) ); - return new LoadBalancer( log, connections, routingTable, new Rediscovery( settings, clock, log, cluster ) ); - } - - @Test - public void shouldConnectToRouter() throws Exception - { - // given - connections.up( "some.host", 1337 ); - cluster.on( "some.host", 1337 ) - .provide( "some.host", 1337, READ, WRITE, ROUTE ) - .provide( "another.host", 1337, ROUTE ); - - // when - Connection connection = seedLoadBalancer( "some.host", 1337 ).acquireReadConnection(); - - // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 1 ) ); - events.assertContains( acquiredConnection( "some.host", 1337, connection ) ); - } - - @Test - public void shouldConnectToRouterOnInitialization() throws Exception - { - // given - connections.up( "some.host", 1337 ); - cluster.on( "some.host", 1337 ) - .provide( "some.host", 1337, READ, WRITE, ROUTE ) - .provide( "another.host", 1337, ROUTE ); - - // when - seedLoadBalancer( "some.host", 1337 ); - - // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 1 ) ); - } - - @Test - public void shouldReconnectWithRouterAfterTtlExpires() throws Exception - { - // given - coreClusterOn( 20, "some.host", 1337, "another.host" ); - connections.up( "some.host", 1337 ).up( "another.host", 1337 ); - - LoadBalancer routing = seedLoadBalancer( "some.host", 1337 ); - - // when - clock.progress( 25_000 ); // will cause TTL timeout - Connection connection = routing.acquireWriteConnection(); - - // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 2 ) ); - events.assertContains( acquiredConnection( "some.host", 1337, connection ) ); - } - - @Test - public void shouldNotReconnectWithRouterWithinTtl() throws Exception - { - // given - coreClusterOn( 20, "some.host", 1337, "another.host" ); - connections.up( "some.host", 1337 ).up( "another.host", 1337 ); - - LoadBalancer routing = seedLoadBalancer( "some.host", 1337 ); - - // when - clock.progress( 15_000 ); // not enough to cause TTL timeout - routing.acquireWriteConnection(); - - // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 1 ) ); - } - - @Test - public void shouldReconnectWithRouterIfOnlyOneRouterIsFound() throws Exception - { - // given - cluster.on( "here", 1337 ) - .ttlSeconds( 20 ) - .provide( "here", 1337, READ, WRITE, ROUTE ); - connections.up( "here", 1337 ); - - LoadBalancer routing = seedLoadBalancer( "here", 1337 ); - - // when - routing.acquireReadConnection(); - - // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 2 ) ); - } - - @Test - public void shouldReconnectWithRouterIfNoReadersAreAvailable() throws Exception - { - // given - cluster.on( "one", 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, WRITE, ROUTE ) - .provide( "two", 1337, ROUTE ); - cluster.on( "two", 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, READ, WRITE, ROUTE ) - .provide( "two", 1337, READ, ROUTE ); - connections.up( "one", 1337 ).up( "two", 1337 ); - - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); - - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 1 ) ); - - cluster.on( "one", 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, READ, WRITE, ROUTE ) - .provide( "two", 1337, READ, ROUTE ); - - // when - routing.acquireWriteConnection(); // we should require the presence of a READER even though we ask for a WRITER - - // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 2 ) ); - } - @Test - public void shouldReconnectWithRouterIfNoWritersAreAvailable() throws Exception + public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale() throws Exception { // given - cluster.on( "one", 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, READ, ROUTE ) - .provide( "two", 1337, READ, WRITE, ROUTE ); - cluster.on( "two", 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, READ, ROUTE ) - .provide( "two", 1337, READ, WRITE, ROUTE ); - connections.up( "one", 1337 ); - - events.registerHandler( StubConnectionPool.EventSink.class, new StubConnectionPool.EventSink.Adapter() - { - @Override - public void connectionFailure( BoltServerAddress address ) - { - connections.up( "two", 1337 ); - } - } ); - - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); - - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 1 ) ); - - cluster.on( "one", 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, READ, WRITE, ROUTE ) - .provide( "two", 1337, READ, ROUTE ); + ConnectionPool conns = mock( ConnectionPool.class ); + RoutingTable routingTable = mock( RoutingTable.class ); + Rediscovery rediscovery = mock( Rediscovery.class ); + when( routingTable.isStale() ).thenReturn( true ); + HashSet set = new HashSet<>( asList( new BoltServerAddress( "abc", 12 ) ) ); + when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set ); // when - routing.acquireWriteConnection(); + LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, conns, routingTable, rediscovery ); // then - events.assertCount( any( ClusterTopology.CompositionRequest.class ), equalTo( 2 ) ); + InOrder inOrder = inOrder( rediscovery, routingTable, conns ); + inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable ); + inOrder.verify( routingTable ).update( any( ClusterComposition.class ) ); + inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) ); } - @Test - public void shouldDropRouterUnableToPerformRoutingTask() throws Exception - { - // given - connections.up( "some.host", 1337 ) - .up( "other.host", 1337 ) - .up( "another.host", 1337 ); - cluster.on( "some.host", 1337 ) - .ttlSeconds( 20 ) - .provide( "some.host", 1337, READ, WRITE, ROUTE ) - .provide( "other.host", 1337, READ, ROUTE ); - cluster.on( "another.host", 1337 ) - .ttlSeconds( 20 ) - .provide( "some.host", 1337, READ, WRITE, ROUTE ) - .provide( "another.host", 1337, READ, ROUTE ); - events.registerHandler( ClusterTopology.EventSink.class, new ClusterTopology.EventSink.Adapter() - { - @Override - public void clusterComposition( BoltServerAddress address, ClusterComposition result ) - { - if ( result == null ) - { - connections.up( "some.host", 1337 ); - cluster.on( "some.host", 1337 ) - .ttlSeconds( 20 ) - .provide( "some.host", 1337, READ, WRITE, ROUTE ) - .provide( "another.host", 1337, READ, ROUTE ); - } - } - } ); - - LoadBalancer routing = seedLoadBalancer( "some.host", 1337 ); - - // when - connections.down( "some.host", 1337 ); - clock.progress( 25_000 ); // will cause TTL timeout - Connection connection = routing.acquireWriteConnection(); - - // then - events.assertCount( - message( - equalTo( INFO ), - equalTo( "Server unable to perform routing capability, " - + "dropping from list of routers." ) ), - equalTo( 1 ) ); - events.assertContains( acquiredConnection( "some.host", 1337, connection ) ); - } @Test - public void shouldConnectToRoutingServersInTimeoutOrder() throws Exception + public void shouldEnsureRoutingOnInitialization() throws Exception { - // given - coreClusterOn( 20, "one", 1337, "two", "tre" ); - connections.up( "one", 1337 ); - events.registerHandler( StubConnectionPool.EventSink.class, new StubConnectionPool.EventSink.Adapter() - { - int failed; - + // given & when + final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 ); + LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, mock( ConnectionPool.class ), + mock( RoutingTable.class ), mock( Rediscovery.class ) ) { @Override - public void connectionFailure( BoltServerAddress address ) + public void ensureRouting() { - if ( ++failed >= 9 ) // three times per server - { - for ( String host : new String[] {"one", "two", "tre"} ) - { - connections.up( host, 1337 ); - } - } + ensureRoutingCounter.incrementAndGet(); } - } ); - - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); - - // when - connections.down( "one", 1337 ); - clock.progress( 25_000 ); // will cause TTL timeout - routing.acquireWriteConnection(); - - // then - MatcherFactory failedAttempts = inAnyOrder( - connectionFailure( "one", 1337 ), - connectionFailure( "two", 1337 ), - connectionFailure( "tre", 1337 ) ); - events.assertContains( - failedAttempts, - matches( sleep( RETRY_TIMEOUT_DELAY ) ), - failedAttempts, - matches( sleep( 2 * RETRY_TIMEOUT_DELAY ) ), - failedAttempts, - matches( sleep( 4 * RETRY_TIMEOUT_DELAY ) ), - matches( ClusterTopology.CompositionRequest.clusterComposition( - any( Thread.class ), - any( BoltServerAddress.class ), - any( ClusterComposition.class ) ) ) ); - } - - @Test - public void shouldTryConfiguredMaxRoutingFailures() throws Exception - { - // given - int port = 1337; - int maxRoutingFailures = 7; - RoutingSettings settings = new RoutingSettings( maxRoutingFailures, 10 ); - - coreClusterOn( 20, "one", port, "two" ); - connections.up( "one", port ); - - LoadBalancer routing = seedLoadBalancer( "one", port, settings ); - - // when - connections.down( "one", port ); - clock.progress( 25_000 ); // will cause TTL timeout - - try - { - routing.acquireWriteConnection(); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( ServiceUnavailableException.class ) ); - } - - // then - events.assertCount( connectionFailure( "one", port ), equalTo( maxRoutingFailures ) ); - events.assertCount( connectionFailure( "two", port ), equalTo( maxRoutingFailures ) ); - } - - @Test - public void shouldFailIfEnoughConnectionAttemptsFail() throws Exception - { - // when - try - { - seedLoadBalancer( "one", 1337 ); - fail( "expected failure" ); - } - // then - catch ( ServiceUnavailableException e ) - { - assertEquals( "Could not perform discovery. No routing servers available.", e.getMessage() ); - } - } - - private static final Function READ_SERVERS = new Function() - { - @Override - public Connection apply( LoadBalancer routing ) - { - return routing.acquireReadConnection(); - } - }; - - @Test - public void shouldRoundRobinAmongReadServers() throws Exception - { - shouldRoundRobinAmong( READ_SERVERS ); - } - - private static final Function WRITE_SERVERS = new Function() - { - @Override - public Connection apply( LoadBalancer routing ) - { - return routing.acquireWriteConnection(); - } - }; - - @Test - public void shouldRoundRobinAmongWriteServers() throws Exception - { - shouldRoundRobinAmong( WRITE_SERVERS ); - } - - private void shouldRoundRobinAmong( Function acquire ) throws Exception - { - // given - for ( String host : new String[] {"one", "two", "tre"} ) - { - connections.up( host, 1337 ); - cluster.on( host, 1337 ) - .ttlSeconds( 20 ) - .provide( "one", 1337, READ, WRITE, ROUTE ) - .provide( "two", 1337, READ, WRITE, ROUTE ) - .provide( "tre", 1337, READ, WRITE, ROUTE ); - } - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); - - // when - Connection a = acquire.apply( routing ); - Connection b = acquire.apply( routing ); - Connection c = acquire.apply( routing ); - assertNotEquals( a.boltServerAddress(), b.boltServerAddress() ); - assertNotEquals( b.boltServerAddress(), c.boltServerAddress() ); - assertNotEquals( c.boltServerAddress(), a.boltServerAddress() ); - assertEquals( a.boltServerAddress(), acquire.apply( routing ).boltServerAddress() ); - assertEquals( b.boltServerAddress(), acquire.apply( routing ).boltServerAddress() ); - assertEquals( c.boltServerAddress(), acquire.apply( routing ).boltServerAddress() ); - assertEquals( a.boltServerAddress(), acquire.apply( routing ).boltServerAddress() ); - assertEquals( b.boltServerAddress(), acquire.apply( routing ).boltServerAddress() ); - assertEquals( c.boltServerAddress(), acquire.apply( routing ).boltServerAddress() ); + }; // then - MatcherFactory acquireConnections = - inAnyOrder( acquire( "one", 1337 ), acquire( "two", 1337 ), acquire( "tre", 1337 ) ); - events.assertContains( acquireConnections, acquireConnections, acquireConnections ); - events.assertContains( inAnyOrder( acquire( a ), acquire( b ), acquire( c ) ) ); + assertThat( ensureRoutingCounter.get(), equalTo( 1 ) ); } @Test - public void shouldRoundRobinAmongRouters() throws Exception + public void shouldEnsureRoutingWhenAcquireConn() throws Exception { // given - coreClusterOn( 20, "one", 1337, "two", "tre" ); - connections.up( "one", 1337 ).up( "two", 1337 ).up( "tre", 1337 ); + Connection writerConn = mock( Connection.class ); + Connection readConn = mock( Connection.class ); + LoadBalancer balancer = setupLoadBalancer( writerConn, readConn ); + LoadBalancer spy = spy( balancer ); // when - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); - for ( int i = 1; i < 9; i++ ) - { - clock.progress( 25_000 ); - routing.acquireReadConnection(); - } + Connection connection = spy.acquireReadConnection(); // then - final List hosts = new ArrayList<>(); - events.forEach( new ClusterTopology.EventSink() - { - @Override - public void clusterComposition( BoltServerAddress address, ClusterComposition result ) - { - hosts.add( address.host() ); - } - } ); - assertEquals( 9, hosts.size() ); - assertEquals( hosts.get( 0 ), hosts.get( 3 ) ); - assertEquals( hosts.get( 1 ), hosts.get( 4 ) ); - assertEquals( hosts.get( 2 ), hosts.get( 5 ) ); - assertEquals( hosts.get( 0 ), hosts.get( 6 ) ); - assertEquals( hosts.get( 1 ), hosts.get( 7 ) ); - assertEquals( hosts.get( 2 ), hosts.get( 8 ) ); - assertNotEquals( hosts.get( 0 ), hosts.get( 1 ) ); - assertNotEquals( hosts.get( 1 ), hosts.get( 2 ) ); - assertNotEquals( hosts.get( 2 ), hosts.get( 0 ) ); + verify( spy ).ensureRouting(); + assertThat( connection, equalTo( readConn ) ); } @Test - public void shouldForgetPreviousServersOnRerouting() throws Exception + public void shouldAcquireReaderOrWriterConn() throws Exception { - // given - connections.up( "one", 1337 ) - .up( "two", 1337 ); - cluster.on( "one", 1337 ) - .ttlSeconds( 20 ) - .provide( "bad", 1337, READ, WRITE, ROUTE ) - .provide( "one", 1337, READ, ROUTE ); - - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); + Connection writerConn = mock( Connection.class ); + Connection readConn = mock( Connection.class ); + LoadBalancer balancer = setupLoadBalancer( writerConn, readConn ); - // when - coreClusterOn( 20, "one", 1337, "two" ); - clock.progress( 25_000 ); // will cause TTL timeout - Connection ra = routing.acquireReadConnection(); - Connection rb = routing.acquireReadConnection(); - Connection w = routing.acquireWriteConnection(); - assertNotEquals( ra.boltServerAddress(), rb.boltServerAddress() ); - assertEquals( ra.boltServerAddress(), routing.acquireReadConnection().boltServerAddress() ); - assertEquals( rb.boltServerAddress(), routing.acquireReadConnection().boltServerAddress() ); - assertEquals( w.boltServerAddress(), routing.acquireWriteConnection().boltServerAddress() ); - assertEquals( ra.boltServerAddress(), routing.acquireReadConnection().boltServerAddress() ); - assertEquals( rb.boltServerAddress(), routing.acquireReadConnection().boltServerAddress() ); - assertEquals( w.boltServerAddress(), routing.acquireWriteConnection().boltServerAddress() ); - - // then - events.assertNone( acquire( "bad", 1337 ) ); - } - - @Test - public void shouldFailIfNoRouting() throws Exception - { - // given - connections.up( "one", 1337 ); - cluster.on( "one", 1337 ) - .provide( "one", 1337, READ, WRITE ); - - // when - try - { - seedLoadBalancer( "one", 1337 ); - fail( "expected failure" ); - } - // then - catch ( ServiceUnavailableException e ) - { - assertEquals( "Could not perform discovery. No routing servers available.", e.getMessage() ); - } + // when & then + assertThat( balancer.acquireReadConnection(), equalTo( readConn ) ); + assertThat( balancer.acquireWriteConnection(), equalTo( writerConn ) ); } - @Test - public void shouldFailIfNoWriting() throws Exception + private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readConn ) { - // given - connections.up( "one", 1337 ); - cluster.on( "one", 1337 ) - .provide( "one", 1337, READ, ROUTE ); + BoltServerAddress writer = mock( BoltServerAddress.class ); + BoltServerAddress reader = mock( BoltServerAddress.class ); - // when - try - { - seedLoadBalancer( "one", 1337 ); - fail( "expected failure" ); - } - // then - catch ( ServiceUnavailableException e ) - { - assertEquals( "Could not perform discovery. No routing servers available.", e.getMessage() ); - } - } + ConnectionPool connPool = mock( ConnectionPool.class ); + when( connPool.acquire( writer ) ).thenReturn( writerConn ); + when( connPool.acquire( reader ) ).thenReturn( readConn ); - @Test - public void shouldNotForgetAddressForRoutingPurposesWhenUnavailableForOtherUse() throws Exception - { - // given - cluster.on( "one", 1337 ) - .provide( "one", 1337, READ, ROUTE ) - .provide( "two", 1337, WRITE, ROUTE ); - cluster.on( "two", 1337 ) - .provide( "one", 1337, READ, ROUTE ) - .provide( "two", 1337, WRITE, ROUTE ); - connections.up( "one", 1337 ); + RoundRobinAddressSet writerAddrs = mock( RoundRobinAddressSet.class ); + when( writerAddrs.next() ).thenReturn( writer ); - LoadBalancer routing = seedLoadBalancer( "one", 1337 ); - connections.down( "one", 1337 ); - events.registerHandler( FakeClock.EventSink.class, new FakeClock.EventSink.Adapter() - { - @Override - public void sleep( long timestamp, long millis ) - { - connections.up( "two", 1337 ); - } - } ); - - // when - Connection connection = routing.acquireWriteConnection(); + RoundRobinAddressSet readerAddrs = mock( RoundRobinAddressSet.class ); + when( readerAddrs.next() ).thenReturn( reader ); - // then - assertEquals( new BoltServerAddress( "two", 1337 ), connection.boltServerAddress() ); - } + RoutingTable routingTable = mock( RoutingTable.class ); + when( routingTable.readers() ).thenReturn( readerAddrs ); + when( routingTable.writers() ).thenReturn( writerAddrs ); - private void coreClusterOn( int ttlSeconds, String leader, int port, String... others ) - { - for ( int i = 0; i <= others.length; i++ ) - { - String host = (i == others.length) ? leader : others[i]; - ClusterTopology.View view = cluster.on( host, port ) - .ttlSeconds( ttlSeconds ) - .provide( leader, port, READ, WRITE, ROUTE ); - for ( String other : others ) - { - view.provide( other, port, READ, ROUTE ); - } - } - } + LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, connPool, + routingTable, mock( Rediscovery.class ) ) ; - private Matcher acquiredConnection( - String host, int port, Connection connection ) - { - return acquire( - any( Thread.class ), - equalTo( new BoltServerAddress( host, port ) ), - sameInstance( connection ) ); + return balancer; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java new file mode 100644 index 0000000000..e986d3ec08 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -0,0 +1,400 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.net.pooling.PooledConnection; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.exceptions.ProtocolException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; + +import static java.util.Arrays.asList; +import static junit.framework.TestCase.fail; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.B; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.C; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.INVALID_CLUSTER_COMPOSITION; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition; +import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; + +@RunWith( Enclosed.class ) +public class RediscoveryTest +{ + + private static ClusterCompositionResponse.Success success( ClusterComposition cluster ) + { + return new ClusterCompositionResponse.Success( cluster ); + } + + private static ClusterCompositionResponse.Failure failure( RuntimeException e ) + { + return new ClusterCompositionResponse.Failure( e ); + } + + public static class RoutingSettingsTest + { + @Test + public void shouldTryConfiguredMaxRoutingFailures() throws Exception + { + // given + int maxRoutingFailures = 7; + RoutingSettings settings = new RoutingSettings( maxRoutingFailures, 10 ); + + ListBasedRoutingTable routingTable = new ListBasedRoutingTable( asList( A ) ); + + ClusterCompositionProvider mockedProvider = mock( ClusterCompositionProvider.class ); + when( mockedProvider.getClusterComposition( any( Connection.class ) ) ) + .thenReturn( success( INVALID_CLUSTER_COMPOSITION ) ); + + Rediscovery rediscovery = new Rediscovery( settings, mock( Clock.class ), DEV_NULL_LOGGER, mockedProvider ); + + // when + try + { + rediscovery.lookupRoutingTable( mock( ConnectionPool.class ), routingTable ); + fail("Should fail as failed to discovery"); + } + catch( ServiceUnavailableException e ) + { + assertThat( e.getMessage(), containsString( "No routing servers available" ) ); + } + // then + + verify( mockedProvider, times( maxRoutingFailures ) ).getClusterComposition( any( Connection.class ) ); + + } + } + + public static class FailedToConnectTest + { + @Test + public void shouldForgetRouterAndTryNextRouterWhenFailedToConnect() throws Throwable + { + // Given + ListBasedRoutingTable routingTable = new ListBasedRoutingTable( asList( A, B ) ); + + PooledConnection healthyConn = mock( PooledConnection.class ); + ConnectionPool mockedConnections = mock( ConnectionPool.class ); + when( mockedConnections.acquire( A ) ).thenThrow( new ServiceUnavailableException( "failed to connect" ) ); + when( mockedConnections.acquire( B ) ).thenReturn( healthyConn ); + + ClusterCompositionProvider + mockedProvider = mock( ClusterCompositionProvider.class ); + when( mockedProvider.getClusterComposition( healthyConn ) ) + .thenReturn( success( ClusterCompositionUtil.VALID_CLUSTER_COMPOSITION ) ); + + // When + ClusterComposition clusterComposition = rediscover( mockedConnections, routingTable, mockedProvider ); + + // Then + assertThat( routingTable.removedRouters.size(), equalTo( 1 ) ); + assertThat( routingTable.removedRouters.get( 0 ), equalTo( A ) ); + assertThat( clusterComposition, equalTo( ClusterCompositionUtil.VALID_CLUSTER_COMPOSITION ) ); + } + } + + public static class ProcedureNotFoundTest + { + @Test + public void shouldThrowServiceUnavailableWhenNoProcedureFound() throws Throwable + { + // Given + RoutingTable routingTable = new ListBasedRoutingTable( asList( A ) ); + + PooledConnection healthyConn = mock( PooledConnection.class ); + ConnectionPool mockedConnections = mock( ConnectionPool.class ); + when( mockedConnections.acquire( A ) ).thenReturn( healthyConn ); + + + ClusterCompositionProvider + mockedProvider = mock( ClusterCompositionProvider.class ); + when( mockedProvider.getClusterComposition( healthyConn ) ) + .thenReturn( failure( new ServiceUnavailableException( "No such procedure" ) ) ); + + // When & When + try + { + rediscover( mockedConnections, routingTable, mockedProvider ); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); + assertThat( e.getMessage(), startsWith( "No such procedure" ) ); + } + } + } + + @RunWith( Parameterized.class ) + public static class NoWritersTest + { + + @Parameters(name = "Rediscovery result: {0}") + public static Collection data() { + return asList(new Object[][] { + { "([A], [C], [])", createClusterComposition( asList( A ), ClusterCompositionUtil.EMPTY, asList( C ) ) }, + { "([A], [CD], [])", createClusterComposition( asList( A ), ClusterCompositionUtil.EMPTY, asList( C, D ) ) }, + { "([AB], [C], [])", createClusterComposition( asList( A, B ), ClusterCompositionUtil.EMPTY, asList( C ) ) }, + { "([AB], [CD], [])", createClusterComposition( asList( A, B ), ClusterCompositionUtil.EMPTY, asList( C, D ) )} + }); + } + + private ClusterComposition noWriters; + + public NoWritersTest( String testName, ClusterComposition noWriters ) + { + this.noWriters = noWriters; + } + + @Test + public void shouldTryNextRouterWhenNoWriters() throws Throwable + { + // Given + RoutingTable routingTable = new ListBasedRoutingTable( asList( A, B ) ); + + PooledConnection noWriterConn = mock( PooledConnection.class ); + PooledConnection healthyConn = mock( PooledConnection.class ); + ConnectionPool mockedConnections = mock( ConnectionPool.class ); + when( mockedConnections.acquire( A ) ).thenReturn( noWriterConn ); + when( mockedConnections.acquire( B ) ).thenReturn( healthyConn ); + + ClusterCompositionProvider + mockedProvider = mock( ClusterCompositionProvider.class ); + when( mockedProvider.getClusterComposition( noWriterConn ) ).thenReturn( success( noWriters ) ); + when( mockedProvider.getClusterComposition( healthyConn ) ) + .thenReturn( success( ClusterCompositionUtil.VALID_CLUSTER_COMPOSITION ) ); + + // When + ClusterComposition clusterComposition = rediscover( mockedConnections, routingTable, mockedProvider ); + + // Then + assertThat( clusterComposition, equalTo( ClusterCompositionUtil.VALID_CLUSTER_COMPOSITION ) ); + } + + @Test + public void shouldThrowServiceUnavailableWhenNoNextRouter() throws Throwable + { + // Given + RoutingTable routingTable = new ListBasedRoutingTable( asList( A ) ); + + PooledConnection noWriterConn = mock( PooledConnection.class ); + ConnectionPool mockedConnections = mock( ConnectionPool.class ); + when( mockedConnections.acquire( A ) ).thenReturn( noWriterConn ); + + ClusterCompositionProvider + mockedProvider = mock( ClusterCompositionProvider.class ); + when( mockedProvider.getClusterComposition( noWriterConn ) ).thenReturn( success( noWriters ) ); + + // When & THen + try + { + rediscover( mockedConnections, routingTable, mockedProvider ); + fail( "Expecting a failure but not triggered." ); + } + catch( Exception e ) + { + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); + assertThat( e.getMessage(), startsWith( "Could not perform discovery. No routing servers available." ) ); + } + } + } + + @RunWith( Parameterized.class ) + public static class AtLeastOneOfEachTest + { + @Parameters(name = "Rediscovery result: {0}") + public static Collection data() { + return asList(new Object[][] { + { "([A], [C], [E])", createClusterComposition( asList( A ), asList( C ), asList( E ) ) }, + { "([AB], [C], [E])", createClusterComposition( asList( A, B ), asList( C ), asList( E ) ) }, + { "([A], [CD], [E])", createClusterComposition( asList( A ), asList( C, D ), asList( E ) ) }, + { "([AB], [CD], [E])", createClusterComposition( asList( A, B ), asList( C, D ), asList( E ) ) }, + { "([A], [C], [EF])", createClusterComposition( asList( A ), asList( C ), asList( E, F ) ) }, + { "([AB], [C], [EF])", createClusterComposition( asList( A, B ), asList( C ), asList( E, F ) ) }, + { "([A], [CD], [EF])", createClusterComposition( asList( A ), asList( C, D ), asList( E, F ) ) }, + { "([AB], [CD], [EF])", createClusterComposition( asList( A, B ), asList( C, D ), asList( E, F ) )} + }); + } + + private ClusterComposition atLeastOneOfEach; + + public AtLeastOneOfEachTest( String testName, ClusterComposition atLeastOneOfEach ) + { + this.atLeastOneOfEach = atLeastOneOfEach; + } + + @Test + public void shouldUpdateRoutingTableWithTheNewOne() throws Throwable + { + // Given + RoutingTable routingTable = new ListBasedRoutingTable( asList( A ) ); + + PooledConnection healthyConn = mock( PooledConnection.class ); + ConnectionPool mockedConnections = mock( ConnectionPool.class ); + when( mockedConnections.acquire( A ) ).thenReturn( healthyConn ); + + ClusterCompositionProvider + mockedProvider = mock( ClusterCompositionProvider.class ); + when( mockedProvider.getClusterComposition( healthyConn ) ).thenReturn( success( atLeastOneOfEach ) ); + + // When + ClusterComposition clusterComposition = rediscover( mockedConnections, routingTable, mockedProvider ); + + // Then + assertThat( clusterComposition, equalTo( atLeastOneOfEach ) ); + } + } + + public static class IllegalResponseTest + { + @Test + public void shouldProtocolErrorWhenFailedToPaseClusterCompositin() throws Throwable + { + // Given + RoutingTable routingTable = new ListBasedRoutingTable( asList( A ) ); + + PooledConnection healthyConn = mock( PooledConnection.class ); + ConnectionPool mockedConnections = mock( ConnectionPool.class ); + when( mockedConnections.acquire( A ) ).thenReturn( healthyConn ); + + + ClusterCompositionProvider mockedProvider = mock( ClusterCompositionProvider.class ); + ProtocolException exception = new ProtocolException( "Failed to parse result" ); + when( mockedProvider.getClusterComposition( healthyConn ) ).thenReturn( failure( exception ) ); + + // When & When + try + { + rediscover( mockedConnections, routingTable, mockedProvider ); + fail( "Expecting a failure but not triggered." ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ProtocolException.class ) ); + assertThat( e, equalTo( (Exception) exception ) ); + } + } + } + + private static ClusterComposition rediscover( ConnectionPool connections, RoutingTable routingTable, + ClusterCompositionProvider provider ) throws InterruptedException + { + + RoutingSettings defaultRoutingSettings = new RoutingSettings( 1, 0 ); + Clock mockedClock = mock( Clock.class ); + Logger mockedLogger = mock( Logger.class ); + + Rediscovery rediscovery = new Rediscovery( defaultRoutingSettings, mockedClock, mockedLogger, provider ); + return rediscovery.lookupRoutingTable( connections, routingTable ); + } + + private static class ListBasedRoutingTable implements RoutingTable + { + private final List routers; + private int index; + private final List removedRouters = new ArrayList<>(); + + public ListBasedRoutingTable( List routers ) + { + this.routers = routers; + this.index = 0; + } + + @Override + public boolean isStale() + { + throw new UnsupportedOperationException(); + } + + @Override + public HashSet update( ClusterComposition cluster ) + { + throw new UnsupportedOperationException(); + } + + @Override + public void forget( BoltServerAddress address ) + { + throw new UnsupportedOperationException(); + } + + @Override + public RoundRobinAddressSet readers() + { + throw new UnsupportedOperationException(); + } + + @Override + public RoundRobinAddressSet writers() + { + throw new UnsupportedOperationException(); + } + + @Override + public BoltServerAddress nextRouter() + { + return routers.get( (index ++) % routerSize() ); + } + + @Override + public int routerSize() + { + return routers.size(); + } + + @Override + public void removeWriter( BoltServerAddress toRemove ) + { + throw new UnsupportedOperationException(); + } + + @Override + public void removeRouter( BoltServerAddress toRemove ) + { + removedRouters.add( toRemove ); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/spi/StubConnectionPool.java b/driver/src/test/java/org/neo4j/driver/internal/spi/StubConnectionPool.java deleted file mode 100644 index 58843c4389..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/spi/StubConnectionPool.java +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.spi; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; - -import org.neo4j.driver.internal.EventHandler; -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.net.pooling.PooledConnection; -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import org.neo4j.driver.v1.util.Function; - -import static org.hamcrest.Matchers.any; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.sameInstance; - -public class StubConnectionPool implements ConnectionPool -{ - public interface EventSink - { - EventSink VOID = new Adapter(); - - void acquire( BoltServerAddress address, Connection connection ); - - void release( BoltServerAddress address, Connection connection ); - - void connectionFailure( BoltServerAddress address ); - - void purge( BoltServerAddress address, boolean connected ); - - void close( Collection connected ); - - class Adapter implements EventSink - { - @Override - public void acquire( BoltServerAddress address, Connection connection ) - { - } - - @Override - public void release( BoltServerAddress address, Connection connection ) - { - } - - @Override - public void connectionFailure( BoltServerAddress address ) - { - } - - @Override - public void purge( BoltServerAddress address, boolean connected ) - { - } - - @Override - public void close( Collection connected ) - { - } - } - } - - private final Clock clock; - private final EventSink events; - private final Function factory; - private static final Function NULL_FACTORY = - new Function() - { - @Override - public Connection apply( BoltServerAddress boltServerAddress ) - { - return null; - } - }; - - public StubConnectionPool( Clock clock, final EventHandler events, Function factory ) - { - this( clock, events == null ? null : new EventSink() - { - @Override - public void acquire( BoltServerAddress address, Connection connection ) - { - events.add( new AcquireEvent( Thread.currentThread(), address, connection ) ); - } - - @Override - public void release( BoltServerAddress address, Connection connection ) - { - events.add( new ReleaseEvent( Thread.currentThread(), address, connection ) ); - } - - @Override - public void connectionFailure( BoltServerAddress address ) - { - events.add( new ConnectionFailureEvent( Thread.currentThread(), address ) ); - } - - @Override - public void purge( BoltServerAddress address, boolean connected ) - { - events.add( new PurgeEvent( Thread.currentThread(), address, connected ) ); - } - - @Override - public void close( Collection connected ) - { - events.add( new CloseEvent( Thread.currentThread(), connected ) ); - } - }, factory ); - } - - public StubConnectionPool( Clock clock, EventSink events, Function factory ) - { - this.clock = clock; - this.events = events == null ? EventSink.VOID : events; - this.factory = factory == null ? NULL_FACTORY : factory; - } - - private enum State - { - AVAILABLE, - CONNECTED, - PURGED - } - - private final ConcurrentMap hosts = new ConcurrentHashMap<>(); - - public StubConnectionPool up( String host, int port ) - { - hosts.putIfAbsent( new BoltServerAddress( host, port ), State.AVAILABLE ); - return this; - } - - public StubConnectionPool down( String host, int port ) - { - hosts.remove( new BoltServerAddress( host, port ) ); - return this; - } - - @Override - public Connection acquire( BoltServerAddress address ) - { - if ( hosts.replace( address, State.CONNECTED ) == null ) - { - events.connectionFailure( address ); - throw new ServiceUnavailableException( "Host unavailable: " + address ); - } - Connection connection = new StubConnection( address, factory.apply( address ), events, clock ); - events.acquire( address, connection ); - return connection; - } - - @Override - public void purge( BoltServerAddress address ) - { - State state = hosts.replace( address, State.PURGED ); - events.purge( address, state == State.CONNECTED ); - } - - @Override - public boolean hasAddress( BoltServerAddress address ) - { - return State.CONNECTED == hosts.get( address ); - } - - @Override - public void close() - { - List connected = new ArrayList<>( hosts.size() ); - for ( Map.Entry entry : hosts.entrySet() ) - { - if ( entry.getValue() == State.CONNECTED ) - { - connected.add( entry.getKey() ); - } - } - events.close( connected ); - hosts.clear(); - } - - private static class StubConnection extends PooledConnection - { - private final BoltServerAddress address; - - StubConnection( - final BoltServerAddress address, - final Connection delegate, - final EventSink events, - Clock clock ) - { - super( delegate, new Consumer() - { - @Override - public void accept( PooledConnection self ) - { - events.release( address, self ); - if ( delegate != null ) - { - delegate.close(); - } - } - }, clock ); - this.address = address; - } - - @Override - public String toString() - { - return String.format( "StubConnection{%s}@%s", address, System.identityHashCode( this ) ); - } - - @Override - public BoltServerAddress boltServerAddress() - { - return address; - } - } - - public static abstract class Event extends org.neo4j.driver.internal.Event - { - final Thread thread; - - private Event( Thread thread ) - { - this.thread = thread; - } - - public static Matcher acquire( String host, int port ) - { - return acquire( - any( Thread.class ), - equalTo( new BoltServerAddress( host, port ) ), - any( Connection.class ) ); - } - - public static Matcher acquire( Connection connection ) - { - return acquire( any( Thread.class ), any( BoltServerAddress.class ), sameInstance( connection ) ); - } - - public static Matcher acquire( - final Matcher thread, - final Matcher address, - final Matcher connection ) - { - return new TypeSafeMatcher() - { - @Override - public void describeTo( Description description ) - { - description.appendText( "acquire event on thread <" ) - .appendDescriptionOf( thread ) - .appendText( "> of address <" ) - .appendDescriptionOf( address ) - .appendText( "> resulting in connection <" ) - .appendDescriptionOf( connection ) - .appendText( ">" ); - } - - @Override - protected boolean matchesSafely( AcquireEvent event ) - { - return thread.matches( event.thread ) && - address.matches( event.address ) && - connection.matches( event.connection ); - } - }; - } - - public static Matcher release( - final Matcher thread, - final Matcher address, - final Matcher connection ) - { - return new TypeSafeMatcher() - { - @Override - public void describeTo( Description description ) - { - description.appendText( "release event on thread <" ) - .appendDescriptionOf( thread ) - .appendText( "> of address <" ) - .appendDescriptionOf( address ) - .appendText( "> and connection <" ) - .appendDescriptionOf( connection ) - .appendText( ">" ); - } - - @Override - protected boolean matchesSafely( ReleaseEvent event ) - { - return thread.matches( event.thread ) && - address.matches( event.address ) && - connection.matches( event.connection ); - } - }; - } - - public static Matcher connectionFailure( String host, int port ) - { - return connectionFailure( any( Thread.class ), equalTo( new BoltServerAddress( host, port ) ) ); - } - - public static Matcher connectionFailure( - final Matcher thread, - final Matcher address ) - { - return new TypeSafeMatcher() - { - @Override - public void describeTo( Description description ) - { - description.appendText( "connection failure event on thread <" ) - .appendDescriptionOf( thread ) - .appendText( "> of address <" ) - .appendDescriptionOf( address ) - .appendText( ">" ); - } - - @Override - protected boolean matchesSafely( ConnectionFailureEvent event ) - { - return thread.matches( event.thread ) && - address.matches( event.address ); - } - }; - } - - public static Matcher purge( - final Matcher thread, - final Matcher address, - final Matcher removed ) - { - return new TypeSafeMatcher() - { - @Override - public void describeTo( Description description ) - { - description.appendText( "purge event on thread <" ) - .appendDescriptionOf( thread ) - .appendText( "> of address <" ) - .appendDescriptionOf( address ) - .appendText( "> resulting in actual removal: " ) - .appendDescriptionOf( removed ); - } - - @Override - protected boolean matchesSafely( PurgeEvent event ) - { - return thread.matches( event.thread ) && - address.matches( event.address ) && - removed.matches( event.connected ); - } - }; - } - - public static Matcher close( - final Matcher thread, - final Matcher> addresses ) - { - return new TypeSafeMatcher() - { - @Override - public void describeTo( Description description ) - { - description.appendText( "close event on thread <" ) - .appendDescriptionOf( thread ) - .appendText( "> resulting in closing connections to <" ) - .appendDescriptionOf( addresses ) - .appendText( ">" ); - } - - @Override - protected boolean matchesSafely( CloseEvent event ) - { - return thread.matches( event.thread ) && addresses.matches( event.connected ); - } - }; - } - } - - private static class AcquireEvent extends Event - { - private final BoltServerAddress address; - private final Connection connection; - - AcquireEvent( Thread thread, BoltServerAddress address, Connection connection ) - { - super( thread ); - this.address = address; - this.connection = connection; - } - - @Override - public void dispatch( EventSink sink ) - { - sink.acquire( address, connection ); - } - } - - private static class ReleaseEvent extends Event - { - private final BoltServerAddress address; - private final Connection connection; - - ReleaseEvent( Thread thread, BoltServerAddress address, Connection connection ) - { - super( thread ); - this.address = address; - this.connection = connection; - } - - @Override - public void dispatch( EventSink sink ) - { - sink.release( address, connection ); - } - } - - private static class ConnectionFailureEvent extends Event - { - private final BoltServerAddress address; - - ConnectionFailureEvent( Thread thread, BoltServerAddress address ) - { - super( thread ); - this.address = address; - } - - @Override - public void dispatch( EventSink sink ) - { - sink.connectionFailure( address ); - } - } - - private static class PurgeEvent extends Event - { - private final BoltServerAddress address; - private final boolean connected; - - PurgeEvent( Thread thread, BoltServerAddress address, boolean connected ) - { - super( thread ); - this.address = address; - this.connected = connected; - } - - @Override - public void dispatch( EventSink sink ) - { - sink.purge( address, connected ); - } - } - - private static class CloseEvent extends Event - { - private final Collection connected; - - CloseEvent( Thread thread, Collection connected ) - { - super( thread ); - this.connected = connected; - } - - @Override - public void dispatch( EventSink sink ) - { - sink.close( connected ); - } - } -} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index ca010427fd..26b1715b77 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -104,7 +104,7 @@ public void sessionCreationShouldFailIfCallingDiscoveryProcedureOnEdgeServer() t } catch ( ServiceUnavailableException ex ) { - assertEquals( "Could not perform discovery. No routing servers available.", ex.getMessage() ); + assertThat( ex.getMessage(), containsString( "Failed to call 'dbms.cluster.routing.getServers'" ) ); } }