diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java index 1d54925185..42f21f69c5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java @@ -47,7 +47,7 @@ public class RoutingTransaction implements Transaction private final BoltServerAddress address; private final RoutingErrorHandler onError; - RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address, + public RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address, RoutingErrorHandler onError ) { this.delegate = delegate; diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index 3f39469f75..997b32e0c2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -58,10 +58,10 @@ private LoadBalancer( RoutingTable routingTable, ClusterCompositionProvider provider ) throws ServiceUnavailableException { - this( log, connections, routingTable, new Rediscovery( settings, clock, log, provider ) ); + this( routingTable, connections, new Rediscovery( settings, clock, log, provider ), log ); } - LoadBalancer( Logger log, ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery ) + LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log ) throws ServiceUnavailableException { this.log = log; diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java index 024004d565..313059df95 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java @@ -29,8 +29,6 @@ interface Writer Writer write( Message msg ) throws IOException; Writer flush() throws IOException; - - Writer reset( WritableByteChannel channel ); } interface Reader diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java index 20752cb330..14dfb36432 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java @@ -345,13 +345,6 @@ public Writer write( Message msg ) throws IOException return this; } - @Override - public Writer reset( WritableByteChannel channel ) - { - packer.reset( channel ); - return this; - } - private void packNode( Node node ) throws IOException { packer.packStructHeader( NODE_FIELDS, NODE ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 567d0b8f66..cde937b0c6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -38,6 +38,7 @@ import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.summary.ServerInfo; import static java.lang.String.format; @@ -165,8 +166,7 @@ public synchronized void flush() } catch ( IOException e ) { - String message = e.getMessage(); - throw new ClientException( "Unable to send messages to server: " + message, e ); + throw new ServiceUnavailableException( "Unable to send messages to server: " + e.getMessage(), e ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java index a5104c923c..dbb12281ae 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java +++ b/driver/src/main/java/org/neo4j/driver/internal/packstream/PackStream.java @@ -19,7 +19,6 @@ package org.neo4j.driver.internal.packstream; import java.io.IOException; -import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -158,16 +157,6 @@ public Packer( PackOutput out ) this.out = out; } - public void reset( PackOutput out ) - { - this.out = out; - } - - public void reset( WritableByteChannel channel ) - { - ((BufferedChannelOutput) out).reset( channel ); - } - public void flush() throws IOException { out.flush(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 84ad2c4ec7..66b1da4d95 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -21,17 +21,26 @@ import org.junit.Test; import org.mockito.InOrder; -import java.util.HashSet; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.RoutingTransaction; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; -import static java.util.Arrays.asList; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -49,27 +58,28 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale RoutingTable routingTable = mock( RoutingTable.class ); Rediscovery rediscovery = mock( Rediscovery.class ); when( routingTable.isStale() ).thenReturn( true ); - HashSet set = new HashSet<>( asList( new BoltServerAddress( "abc", 12 ) ) ); + Set set = Collections.singleton( new BoltServerAddress( "abc", 12 ) ); when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set ); // when - LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, conns, routingTable, rediscovery ); + LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER ); // then + assertNotNull( balancer ); InOrder inOrder = inOrder( rediscovery, routingTable, conns ); inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable ); inOrder.verify( routingTable ).update( any( ClusterComposition.class ) ); inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) ); } - @Test public void shouldEnsureRoutingOnInitialization() throws Exception { // given & when final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 ); - LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, mock( ConnectionPool.class ), - mock( RoutingTable.class ), mock( Rediscovery.class ) ) { + LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ), + mock( Rediscovery.class ), DEV_NULL_LOGGER ) + { @Override public void ensureRouting() { @@ -78,6 +88,7 @@ public void ensureRouting() }; // then + assertNotNull( balancer ); assertThat( ensureRoutingCounter.get(), equalTo( 1 ) ); } @@ -129,9 +140,36 @@ private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readCo when( routingTable.readers() ).thenReturn( readerAddrs ); when( routingTable.writers() ).thenReturn( writerAddrs ); - LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, connPool, - routingTable, mock( Rediscovery.class ) ) ; + return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER ); + } - return balancer; + @Test + public void shouldForgetAddressAndItsConnectionsOnServiceUnavailable() + { + Transaction tx = mock( Transaction.class ); + RoutingTable routingTable = mock( RoutingTable.class ); + ConnectionPool connectionPool = mock( ConnectionPool.class ); + Rediscovery rediscovery = mock( Rediscovery.class ); + LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER ); + BoltServerAddress address = new BoltServerAddress( "host", 42 ); + + RoutingTransaction routingTx = new RoutingTransaction( tx, AccessMode.WRITE, address, loadBalancer ); + + ServiceUnavailableException txCloseError = new ServiceUnavailableException( "Oh!" ); + doThrow( txCloseError ).when( tx ).close(); + + try + { + routingTx.close(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) ); + } + + verify( routingTable ).forget( address ); + verify( connectionPool ).purge( address ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java index f3780d90ed..b60db2748f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java @@ -22,19 +22,22 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Iterator; +import java.util.Queue; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.SuccessMessage; import org.neo4j.driver.internal.summary.InternalServerInfo; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Values; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.summary.ServerInfo; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -45,16 +48,20 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; +import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; import static org.neo4j.driver.v1.Values.parameters; public class SocketConnectionTest { + private static final InternalServerInfo SERVER_INFO = new InternalServerInfo( LOCAL_DEFAULT, "test" ); + @Test public void shouldReceiveServerInfoAfterInit() throws Throwable { // Given SocketClient socket = mock( SocketClient.class ); - SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) ); + SocketConnection conn = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER ); when( socket.address() ).thenReturn( BoltServerAddress.from( URI.create( "http://neo4j.com:9000" ) ) ); @@ -98,7 +105,7 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable // Then try { - SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) ); + new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER ); fail( "should have failed with the provided exception" ); } catch( Throwable e ) @@ -108,4 +115,26 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable } verify( socket, times( 1 ) ).stop(); } + + @Test + @SuppressWarnings( "unchecked" ) + public void flushThrowsWhenSocketIsBroken() throws Exception + { + SocketClient socket = mock( SocketClient.class ); + IOException sendError = new IOException( "Unable to send" ); + doThrow( sendError ).when( socket ).send( any( Queue.class ) ); + + SocketConnection connection = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER ); + + try + { + connection.flush(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ServiceUnavailableException.class ) ); + assertSame( sendError, e.getCause() ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java b/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java similarity index 96% rename from driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java rename to driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java index 1a3db0786a..e01585236a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java +++ b/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelInput.java @@ -40,15 +40,8 @@ public BufferedChannelInput(ReadableByteChannel ch ) public BufferedChannelInput( int bufferCapacity, ReadableByteChannel ch ) { this.buffer = ByteBuffer.allocate( bufferCapacity ).order( ByteOrder.BIG_ENDIAN ); - reset( ch ); - } - - public BufferedChannelInput reset( ReadableByteChannel ch ) - { - this.channel = ch; - this.buffer.position( 0 ); this.buffer.limit( 0 ); - return this; + this.channel = ch; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java b/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java similarity index 93% rename from driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java rename to driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java index cde8fe8f64..70bc8ff29e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java +++ b/driver/src/test/java/org/neo4j/driver/internal/packstream/BufferedChannelOutput.java @@ -26,12 +26,7 @@ public class BufferedChannelOutput implements PackOutput { private final ByteBuffer buffer; - private WritableByteChannel channel; - - public BufferedChannelOutput( int bufferSize ) - { - this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN ); - } + private final WritableByteChannel channel; public BufferedChannelOutput( WritableByteChannel channel ) { @@ -40,12 +35,7 @@ public BufferedChannelOutput( WritableByteChannel channel ) public BufferedChannelOutput( WritableByteChannel channel, int bufferSize ) { - this( bufferSize ); - reset( channel ); - } - - public void reset( WritableByteChannel channel ) - { + this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN ); this.channel = channel; } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index 26b1715b77..b78a7bd30c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -312,8 +312,8 @@ public void shouldHandleGracefulLeaderSwitch() throws Exception parameters( "name", "Webber", "title", "Mr" ) ); tx1.success(); - closeAndExpectException( tx1, ClientException.class ); - closeAndExpectException( session1, ClientException.class ); + closeAndExpectException( tx1, SessionExpiredException.class ); + session1.close(); String bookmark = inExpirableSession( driver, createSession(), new Function() {