Skip to content

Commit b46f48a

Browse files
committed
Throw ServiceUnavailableException when socket write fails
Previously `ClientException` was thrown which was incorrect and resulted in connections not being properly disposed.
1 parent 1a63c80 commit b46f48a

File tree

6 files changed

+87
-20
lines changed

6 files changed

+87
-20
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class RoutingTransaction implements Transaction
4747
private final BoltServerAddress address;
4848
private final RoutingErrorHandler onError;
4949

50-
RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
50+
public RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
5151
RoutingErrorHandler onError )
5252
{
5353
this.delegate = delegate;

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ private LoadBalancer(
5858
RoutingTable routingTable,
5959
ClusterCompositionProvider provider ) throws ServiceUnavailableException
6060
{
61-
this( log, connections, routingTable, new Rediscovery( settings, clock, log, provider ) );
61+
this( routingTable, connections, new Rediscovery( settings, clock, log, provider ), log );
6262
}
6363

64-
LoadBalancer( Logger log, ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery )
64+
LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log )
6565
throws ServiceUnavailableException
6666
{
6767
this.log = log;

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.neo4j.driver.v1.Value;
3939
import org.neo4j.driver.v1.exceptions.ClientException;
4040
import org.neo4j.driver.v1.exceptions.Neo4jException;
41+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4142
import org.neo4j.driver.v1.summary.ServerInfo;
4243

4344
import static java.lang.String.format;
@@ -165,8 +166,7 @@ public synchronized void flush()
165166
}
166167
catch ( IOException e )
167168
{
168-
String message = e.getMessage();
169-
throw new ClientException( "Unable to send messages to server: " + message, e );
169+
throw new ServiceUnavailableException( "Unable to send messages to server: " + e.getMessage(), e );
170170
}
171171
}
172172

driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,26 @@
2121
import org.junit.Test;
2222
import org.mockito.InOrder;
2323

24-
import java.util.HashSet;
24+
import java.util.Collections;
25+
import java.util.Set;
2526
import java.util.concurrent.atomic.AtomicInteger;
2627

28+
import org.neo4j.driver.internal.RoutingTransaction;
2729
import org.neo4j.driver.internal.net.BoltServerAddress;
2830
import org.neo4j.driver.internal.spi.Connection;
2931
import org.neo4j.driver.internal.spi.ConnectionPool;
32+
import org.neo4j.driver.v1.AccessMode;
33+
import org.neo4j.driver.v1.Transaction;
34+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
35+
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3036

31-
import static java.util.Arrays.asList;
3237
import static org.hamcrest.MatcherAssert.assertThat;
38+
import static org.hamcrest.Matchers.instanceOf;
3339
import static org.hamcrest.core.IsEqual.equalTo;
40+
import static org.junit.Assert.assertNotNull;
41+
import static org.junit.Assert.fail;
3442
import static org.mockito.Matchers.any;
43+
import static org.mockito.Mockito.doThrow;
3544
import static org.mockito.Mockito.inOrder;
3645
import static org.mockito.Mockito.mock;
3746
import static org.mockito.Mockito.spy;
@@ -49,27 +58,28 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
4958
RoutingTable routingTable = mock( RoutingTable.class );
5059
Rediscovery rediscovery = mock( Rediscovery.class );
5160
when( routingTable.isStale() ).thenReturn( true );
52-
HashSet<BoltServerAddress> set = new HashSet<>( asList( new BoltServerAddress( "abc", 12 ) ) );
61+
Set<BoltServerAddress> set = Collections.singleton( new BoltServerAddress( "abc", 12 ) );
5362
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );
5463

5564
// when
56-
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, conns, routingTable, rediscovery );
65+
LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER );
5766

5867
// then
68+
assertNotNull( balancer );
5969
InOrder inOrder = inOrder( rediscovery, routingTable, conns );
6070
inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable );
6171
inOrder.verify( routingTable ).update( any( ClusterComposition.class ) );
6272
inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) );
6373
}
6474

65-
6675
@Test
6776
public void shouldEnsureRoutingOnInitialization() throws Exception
6877
{
6978
// given & when
7079
final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 );
71-
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, mock( ConnectionPool.class ),
72-
mock( RoutingTable.class ), mock( Rediscovery.class ) ) {
80+
LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ),
81+
mock( Rediscovery.class ), DEV_NULL_LOGGER )
82+
{
7383
@Override
7484
public void ensureRouting()
7585
{
@@ -78,6 +88,7 @@ public void ensureRouting()
7888
};
7989

8090
// then
91+
assertNotNull( balancer );
8192
assertThat( ensureRoutingCounter.get(), equalTo( 1 ) );
8293
}
8394

@@ -129,9 +140,36 @@ private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readCo
129140
when( routingTable.readers() ).thenReturn( readerAddrs );
130141
when( routingTable.writers() ).thenReturn( writerAddrs );
131142

132-
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, connPool,
133-
routingTable, mock( Rediscovery.class ) ) ;
143+
return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER );
144+
}
134145

135-
return balancer;
146+
@Test
147+
public void shouldForgetAddressAndItsConnectionsOnServiceUnavailable()
148+
{
149+
Transaction tx = mock( Transaction.class );
150+
RoutingTable routingTable = mock( RoutingTable.class );
151+
ConnectionPool connectionPool = mock( ConnectionPool.class );
152+
Rediscovery rediscovery = mock( Rediscovery.class );
153+
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
154+
BoltServerAddress address = new BoltServerAddress( "host", 42 );
155+
156+
RoutingTransaction routingTx = new RoutingTransaction( tx, AccessMode.WRITE, address, loadBalancer );
157+
158+
ServiceUnavailableException txCloseError = new ServiceUnavailableException( "Oh!" );
159+
doThrow( txCloseError ).when( tx ).close();
160+
161+
try
162+
{
163+
routingTx.close();
164+
fail( "Exception expected" );
165+
}
166+
catch ( Exception e )
167+
{
168+
assertThat( e, instanceOf( SessionExpiredException.class ) );
169+
assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) );
170+
}
171+
172+
verify( routingTable ).forget( address );
173+
verify( connectionPool ).purge( address );
136174
}
137175
}

driver/src/test/java/org/neo4j/driver/internal/net/SocketConnectionTest.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,22 @@
2222
import org.mockito.invocation.InvocationOnMock;
2323
import org.mockito.stubbing.Answer;
2424

25+
import java.io.IOException;
2526
import java.net.URI;
2627
import java.util.ArrayList;
2728
import java.util.Iterator;
29+
import java.util.Queue;
2830

2931
import org.neo4j.driver.internal.messaging.Message;
3032
import org.neo4j.driver.internal.messaging.SuccessMessage;
3133
import org.neo4j.driver.internal.summary.InternalServerInfo;
32-
import org.neo4j.driver.v1.Logger;
3334
import org.neo4j.driver.v1.Values;
35+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3436
import org.neo4j.driver.v1.summary.ServerInfo;
3537

3638
import static org.hamcrest.CoreMatchers.equalTo;
3739
import static org.hamcrest.CoreMatchers.instanceOf;
40+
import static org.junit.Assert.assertSame;
3841
import static org.junit.Assert.assertThat;
3942
import static org.junit.Assert.fail;
4043
import static org.mockito.Matchers.any;
@@ -45,16 +48,20 @@
4548
import static org.mockito.Mockito.times;
4649
import static org.mockito.Mockito.verify;
4750
import static org.mockito.Mockito.when;
51+
import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER;
52+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
4853
import static org.neo4j.driver.v1.Values.parameters;
4954

5055
public class SocketConnectionTest
5156
{
57+
private static final InternalServerInfo SERVER_INFO = new InternalServerInfo( LOCAL_DEFAULT, "test" );
58+
5259
@Test
5360
public void shouldReceiveServerInfoAfterInit() throws Throwable
5461
{
5562
// Given
5663
SocketClient socket = mock( SocketClient.class );
57-
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
64+
SocketConnection conn = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
5865

5966
when( socket.address() ).thenReturn( BoltServerAddress.from( URI.create( "http://neo4j.com:9000" ) ) );
6067

@@ -98,7 +105,7 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable
98105
// Then
99106
try
100107
{
101-
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
108+
new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
102109
fail( "should have failed with the provided exception" );
103110
}
104111
catch( Throwable e )
@@ -108,4 +115,26 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable
108115
}
109116
verify( socket, times( 1 ) ).stop();
110117
}
118+
119+
@Test
120+
@SuppressWarnings( "unchecked" )
121+
public void flushThrowsWhenSocketIsBroken() throws Exception
122+
{
123+
SocketClient socket = mock( SocketClient.class );
124+
IOException sendError = new IOException( "Unable to send" );
125+
doThrow( sendError ).when( socket ).send( any( Queue.class ) );
126+
127+
SocketConnection connection = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
128+
129+
try
130+
{
131+
connection.flush();
132+
fail( "Exception expected" );
133+
}
134+
catch ( Exception e )
135+
{
136+
assertThat( e, instanceOf( ServiceUnavailableException.class ) );
137+
assertSame( sendError, e.getCause() );
138+
}
139+
}
111140
}

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ public void shouldHandleGracefulLeaderSwitch() throws Exception
312312
parameters( "name", "Webber", "title", "Mr" ) );
313313
tx1.success();
314314

315-
closeAndExpectException( tx1, ClientException.class );
316-
closeAndExpectException( session1, ClientException.class );
315+
closeAndExpectException( tx1, SessionExpiredException.class );
316+
session1.close();
317317

318318
String bookmark = inExpirableSession( driver, createSession(), new Function<Session,String>()
319319
{

0 commit comments

Comments
 (0)