Skip to content

Commit 380e030

Browse files
author
Zhen
committed
Removed the invalid parameterized tag in SocketClient as the test only tests on a mocked channel
Fixed the bug where the channel might not be closed if socketClient.start() throws exceptions Added tests to verify channel is closed after construction failure and IO failures for both plaintext and encrypted channel
1 parent 3007c4a commit 380e030

15 files changed

+305
-143
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.neo4j.driver.internal.cluster;
2121

2222
import java.util.HashSet;
23+
import java.util.Set;
2324

2425
import org.neo4j.driver.internal.net.BoltServerAddress;
2526
import org.neo4j.driver.internal.util.Clock;
@@ -33,7 +34,9 @@ public class ClusterRoutingTable implements RoutingTable
3334

3435
private final Clock clock;
3536
private long expirationTimeout;
36-
private final RoundRobinAddressSet readers, writers, routers;
37+
private final RoundRobinAddressSet readers;
38+
private final RoundRobinAddressSet writers;
39+
private final RoundRobinAddressSet routers;
3740

3841
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
3942
{
@@ -52,7 +55,7 @@ private ClusterRoutingTable( Clock clock )
5255
}
5356

5457
@Override
55-
public boolean stale()
58+
public boolean isStale()
5659
{
5760
return expirationTimeout < clock.millis() || // the expiration timeout has been reached
5861
routers.size() <= MIN_ROUTERS || // we need to discover more routing servers
@@ -61,7 +64,7 @@ public boolean stale()
6164
}
6265

6366
@Override
64-
public synchronized HashSet<BoltServerAddress> update( ClusterComposition cluster )
67+
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
6568
{
6669
expirationTimeout = cluster.expirationTimestamp;
6770
HashSet<BoltServerAddress> removed = new HashSet<>();

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.HashSet;
21+
import java.util.Set;
2222

2323
import org.neo4j.driver.internal.RoutingErrorHandler;
2424
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -29,19 +29,14 @@
2929
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3030

3131
import static java.lang.String.format;
32-
import static org.neo4j.driver.internal.cluster.Rediscovery.lookupRoutingTable;
3332

3433
public final class LoadBalancer implements RoutingErrorHandler, AutoCloseable
3534
{
36-
private final RoutingSettings settings;
37-
private final Clock clock;
38-
// dependencies
3935
private final Logger log;
36+
4037
private final ConnectionPool connections;
41-
// state
42-
private final ClusterComposition.Provider provider;
4338
private final RoutingTable routingTable;
44-
39+
private final Rediscovery rediscovery;
4540

4641
public LoadBalancer(
4742
RoutingSettings settings,
@@ -54,20 +49,24 @@ public LoadBalancer(
5449
new ClusterComposition.Provider.Default( clock, log ) );
5550
}
5651

57-
LoadBalancer(
52+
private LoadBalancer(
5853
RoutingSettings settings,
5954
Clock clock,
6055
Logger log,
6156
ConnectionPool connections,
6257
RoutingTable routingTable,
6358
ClusterComposition.Provider provider ) throws ServiceUnavailableException
6459
{
65-
this.settings = settings;
66-
this.clock = clock;
60+
this( log, connections, routingTable, new Rediscovery( settings, clock, log, provider ) );
61+
}
62+
63+
LoadBalancer( Logger log, ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery )
64+
throws ServiceUnavailableException
65+
{
6766
this.log = log;
6867
this.connections = connections;
6968
this.routingTable = routingTable;
70-
this.provider = provider;
69+
this.rediscovery = rediscovery;
7170

7271
// initialize the routing table
7372
ensureRouting();
@@ -135,15 +134,14 @@ private synchronized void forget( BoltServerAddress address )
135134

136135
private synchronized void ensureRouting() throws ServiceUnavailableException
137136
{
138-
if ( routingTable.stale() )
137+
if ( routingTable.isStale() )
139138
{
140139
log.info( "Routing information is stale. %s", routingTable );
141140
try
142141
{
143142
// get a new routing table
144-
ClusterComposition cluster = lookupRoutingTable( settings, clock, log,
145-
connections, routingTable, provider );
146-
HashSet<BoltServerAddress> removed = routingTable.update( cluster );
143+
ClusterComposition cluster = rediscovery.lookupRoutingTable( connections, routingTable );
144+
Set<BoltServerAddress> removed = routingTable.update( cluster );
147145
// purge connections to removed addresses
148146
for ( BoltServerAddress address : removed )
149147
{

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,22 @@ public class Rediscovery
3131
{
3232
private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery. No routing servers available.";
3333

34-
// Given a old routing table and connection pool, use the connection composition provider to obtain a new
34+
private final RoutingSettings settings;
35+
private final Clock clock;
36+
private final Logger logger;
37+
private final ClusterComposition.Provider provider;
38+
39+
public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, ClusterComposition.Provider provider )
40+
{
41+
this.settings = settings;
42+
this.clock = clock;
43+
this.logger = logger;
44+
this.provider = provider;
45+
}
46+
47+
// Given the current routing table and connection pool, use the connection composition provider to fetch a new
3548
// cluster composition, which would be used to update the routing table and connection pool
36-
public static ClusterComposition lookupRoutingTable(
37-
RoutingSettings settings, Clock clock, Logger log,
38-
ConnectionPool connections, RoutingTable routingTable,
39-
ClusterComposition.Provider provider )
49+
public ClusterComposition lookupRoutingTable( ConnectionPool connections, RoutingTable routingTable )
4050
throws InterruptedException, ServiceUnavailableException
4151
{
4252
int size = routingTable.routerSize(), failures = 0;
@@ -63,17 +73,16 @@ public static ClusterComposition lookupRoutingTable(
6373
try ( Connection connection = connections.acquire( address ) )
6474
{
6575
cluster = provider.getClusterComposition( connection );
66-
log.info( "Got cluster composition %s", cluster );
76+
logger.info( "Got cluster composition %s", cluster );
6777
}
6878
catch ( Exception e )
6979
{
70-
log.error( format( "Failed to connect to routing server '%s'.", address ), e );
80+
logger.error( format( "Failed to connect to routing server '%s'.", address ), e );
7181
continue;
7282
}
7383
if ( cluster == null || !cluster.isValid() )
7484
{
75-
log.info(
76-
"Server <%s> unable to perform routing capability, dropping from list of routers.",
85+
logger.info( "Server <%s> unable to perform routing capability, dropping from list of routers.",
7786
address );
7887
routingTable.removeRouter( address );
7988
if ( --size == 0 )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.HashSet;
21+
import java.util.Set;
2222

2323
import org.neo4j.driver.internal.net.BoltServerAddress;
2424

2525
public interface RoutingTable
2626
{
27-
boolean stale();
27+
boolean isStale();
2828

29-
HashSet<BoltServerAddress> update( ClusterComposition cluster );
29+
Set<BoltServerAddress> update( ClusterComposition cluster );
3030

3131
void forget( BoltServerAddress address );
3232

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void stop()
203203
}
204204
else
205205
{
206-
throw new ClientException( "Unable to close socket connection properly." + e.getMessage(), e );
206+
logger.warn( "Unable to close socket connection properly: '" + e.getMessage() + "'", e );
207207
}
208208
}
209209
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,13 @@ public class SocketConnection implements Connection
6060

6161
public SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging )
6262
{
63-
this.logger = logging.getLog( format( "conn-%s", UUID.randomUUID().toString() ) );
64-
this.socket = new SocketClient( address, securityPlan, timeoutMillis, logger );
65-
this.responseHandler = createResponseHandler( logger );
66-
this.socket.start();
63+
this( address, securityPlan, timeoutMillis,
64+
logging.getLog( format( "conn-%s", UUID.randomUUID().toString() ) ) );
65+
}
66+
67+
private SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
68+
{
69+
this( new SocketClient( address, securityPlan, timeoutMillis, logger ), null, logger );
6770
}
6871

6972
/**
@@ -81,7 +84,16 @@ public SocketConnection( SocketClient socket, InternalServerInfo serverInfo, Log
8184
this.serverInfo = serverInfo;
8285
this.logger = logger;
8386
this.responseHandler = createResponseHandler( logger );
84-
this.socket.start();
87+
88+
try
89+
{
90+
this.socket.start();
91+
}
92+
catch ( Throwable e )
93+
{
94+
close();
95+
throw e;
96+
}
8597
}
8698

8799
private SocketResponseHandler createResponseHandler( Logger logger )

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnector.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ public SocketConnector( ConnectionSettings connectionSettings, SecurityPlan secu
4747
@Override
4848
public final Connection connect( BoltServerAddress address )
4949
{
50-
Connection connection = createConnection( address, securityPlan, connectionSettings.timeoutMillis(),
51-
logging );
50+
Connection connection = createConnection( address, securityPlan, connectionSettings.timeoutMillis(), logging );
5251

5352
// Because SocketConnection is not thread safe, wrap it in this guard
5453
// to ensure concurrent access leads causes application errors

0 commit comments

Comments
 (0)