Skip to content

Commit b3229dc

Browse files
committed
Configurable test-on-borrow for pooled connections
Connections are pooled for both direct and routing driver. Each acquired session contains a socket connection which was acquired from the pool. It is possible for an idle pooled connection to become invalid while just resting in the pool. This could happen when connection is terminated after some timeout by a load balancer or some other network facility. This commit introduces a configurable connection liveness check timeout. So freshly acquired connection, that has been idle in the pool for more than configurable timeout, will be validated by sending a RESET message. If it appears to be broken acquisition will retry until a valid connection is found.
1 parent 889d377 commit b3229dc

20 files changed

+1010
-161
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.neo4j.driver.v1.Config;
3737
import org.neo4j.driver.v1.Driver;
3838
import org.neo4j.driver.v1.Logger;
39+
import org.neo4j.driver.v1.Logging;
3940
import org.neo4j.driver.v1.exceptions.ClientException;
4041

4142
import static java.lang.String.format;
@@ -91,40 +92,62 @@ private Driver createDriver( BoltServerAddress address, String scheme, Connectio
9192
/**
9293
* Creates new {@link DirectDriver}.
9394
* <p>
94-
* <b>This method is package-private only for testing</b>
95+
* <b>This method is protected only for testing</b>
9596
*/
96-
DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
97-
SecurityPlan securityPlan, SessionFactory sessionFactory )
97+
protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool,
98+
Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
9899
{
99100
return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() );
100101
}
101102

102103
/**
103104
* Creates new {@link RoutingDriver}.
104105
* <p>
105-
* <b>This method is package-private only for testing</b>
106+
* <b>This method is protected only for testing</b>
106107
*/
107-
RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
108-
RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
108+
protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
109+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
109110
{
110111
return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory,
111-
Clock.SYSTEM, config.logging() );
112+
createClock(), config.logging() );
112113
}
113114

114115
/**
115116
* Creates new {@link ConnectionPool}.
116117
* <p>
117-
* <b>This method is package-private only for testing</b>
118+
* <b>This method is protected only for testing</b>
118119
*/
119-
ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
120+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
120121
{
121122
authToken = authToken == null ? AuthTokens.none() : authToken;
122123

123124
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
124-
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize() );
125-
Connector connector = new SocketConnector( connectionSettings, securityPlan, config.logging() );
125+
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
126+
config.idleTimeBeforeConnectionTest() );
127+
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );
126128

127-
return new SocketConnectionPool( poolSettings, connector, Clock.SYSTEM, config.logging() );
129+
return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
130+
}
131+
132+
/**
133+
* Creates new {@link Clock}.
134+
* <p>
135+
* <b>This method is protected only for testing</b>
136+
*/
137+
protected Clock createClock()
138+
{
139+
return Clock.SYSTEM;
140+
}
141+
142+
/**
143+
* Creates new {@link Connector}.
144+
* <p>
145+
* <b>This method is protected only for testing</b>
146+
*/
147+
protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
148+
Logging logging )
149+
{
150+
return new SocketConnector( connectionSettings, securityPlan, logging );
128151
}
129152

130153
private static SessionFactory createSessionFactory( Config config )

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,37 @@
2020

2121
public class PoolSettings
2222
{
23+
public static final int NO_IDLE_CONNECTION_TEST = -1;
24+
2325
public static final int DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE = 10;
26+
public static final int DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST = NO_IDLE_CONNECTION_TEST;
2427

25-
/**
26-
* Maximum number of idle connections per pool.
27-
*/
2828
private final int maxIdleConnectionPoolSize;
29+
private final long idleTimeBeforeConnectionTest;
2930

30-
public PoolSettings( int maxIdleConnectionPoolSize )
31+
public PoolSettings( int maxIdleConnectionPoolSize, long idleTimeBeforeConnectionTest )
3132
{
3233
this.maxIdleConnectionPoolSize = maxIdleConnectionPoolSize;
34+
this.idleTimeBeforeConnectionTest = idleTimeBeforeConnectionTest;
3335
}
3436

3537
public int maxIdleConnectionPoolSize()
3638
{
3739
return maxIdleConnectionPoolSize;
3840
}
41+
42+
public long idleTimeBeforeConnectionTest()
43+
{
44+
if ( !idleTimeBeforeConnectionTestConfigured() )
45+
{
46+
throw new IllegalStateException(
47+
"Idle time before connection test is not configured: " + idleTimeBeforeConnectionTest );
48+
}
49+
return idleTimeBeforeConnectionTest;
50+
}
51+
52+
public boolean idleTimeBeforeConnectionTestConfigured()
53+
{
54+
return idleTimeBeforeConnectionTest > 0;
55+
}
3956
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,14 @@ public class PooledConnection implements Connection
5959
private boolean unrecoverableErrorsOccurred = false;
6060
private Runnable onError = null;
6161
private final Clock clock;
62-
private long lastUsed;
62+
private long lastUsedTimestamp;
6363

6464
public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
6565
{
6666
this.delegate = delegate;
6767
this.release = release;
6868
this.clock = clock;
69-
this.lastUsed = clock.millis();
70-
}
71-
72-
public void updateTimestamp()
73-
{
74-
lastUsed = clock.millis();
69+
updateLastUsedTimestamp();
7570
}
7671

7772
@Override
@@ -192,13 +187,14 @@ public void receiveOne()
192187
}
193188
}
194189

195-
@Override
196190
/**
197191
* Make sure only close the connection once on each session to avoid releasing the connection twice, a.k.a.
198192
* adding back the connection twice into the pool.
199193
*/
194+
@Override
200195
public void close()
201196
{
197+
updateLastUsedTimestamp();
202198
release.accept( this );
203199
// put the full logic of deciding whether to dispose the connection or to put it back to
204200
// the pool into the release object
@@ -286,6 +282,11 @@ public void onError( Runnable runnable )
286282
this.onError = runnable;
287283
}
288284

285+
public long lastUsedTimestamp()
286+
{
287+
return lastUsedTimestamp;
288+
}
289+
289290
private boolean isProtocolViolationError(RuntimeException e )
290291
{
291292
return e instanceof Neo4jException
@@ -300,8 +301,8 @@ private boolean isClientOrTransientError( RuntimeException e )
300301
|| ((Neo4jException) e).code().contains( "TransientError" ));
301302
}
302303

303-
public long idleTime()
304+
private void updateLastUsedTimestamp()
304305
{
305-
return clock.millis() - lastUsed;
306+
this.lastUsedTimestamp = clock.millis();
306307
}
307308
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.concurrent.atomic.AtomicBoolean;
22-
21+
import org.neo4j.driver.internal.spi.ConnectionValidator;
2322
import org.neo4j.driver.internal.util.Consumer;
24-
import org.neo4j.driver.v1.util.Function;
2523

2624
/**
2725
* The responsibility of the PooledConnectionReleaseConsumer is to release valid connections
@@ -30,19 +28,19 @@
3028
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3129
{
3230
private final BlockingPooledConnectionQueue connections;
33-
private final Function<PooledConnection, Boolean> validConnection;
31+
private final ConnectionValidator<PooledConnection> connectionValidator;
3432

3533
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
36-
Function<PooledConnection, Boolean> validConnection)
34+
ConnectionValidator<PooledConnection> connectionValidator )
3735
{
3836
this.connections = connections;
39-
this.validConnection = validConnection;
37+
this.connectionValidator = connectionValidator;
4038
}
4139

4240
@Override
4341
public void accept( PooledConnection pooledConnection )
4442
{
45-
if ( validConnection.apply( pooledConnection ) )
43+
if ( connectionValidator.isReusable( pooledConnection ) )
4644
{
4745
connections.offer( pooledConnection );
4846
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.neo4j.driver.internal.net.pooling;
2020

2121
import org.neo4j.driver.internal.spi.ConnectionPool;
22-
import org.neo4j.driver.v1.util.Function;
22+
import org.neo4j.driver.internal.spi.ConnectionValidator;
2323

24-
class PooledConnectionValidator implements Function<PooledConnection,Boolean>
24+
class PooledConnectionValidator implements ConnectionValidator<PooledConnection>
2525
{
2626
private final ConnectionPool pool;
2727

@@ -31,28 +31,25 @@ class PooledConnectionValidator implements Function<PooledConnection,Boolean>
3131
}
3232

3333
@Override
34-
public Boolean apply( PooledConnection pooledConnection )
34+
public boolean isReusable( PooledConnection pooledConnection )
3535
{
3636
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
3737
// and we should close the conn without bothering to reset the conn at all
3838
return pool.hasAddress( pooledConnection.boltServerAddress() ) &&
3939
!pooledConnection.hasUnrecoverableErrors() &&
40-
reset( pooledConnection );
40+
isConnected( pooledConnection );
4141
}
4242

43-
/**
44-
* In case this session has an open result or transaction or something,
45-
* make sure it's reset to a nice state before we reuse it.
46-
*
47-
* @param conn the PooledConnection
48-
* @return true if the connection is reset successfully without any error, otherwise false.
49-
*/
50-
private static boolean reset( PooledConnection conn )
43+
@Override
44+
public boolean isConnected( PooledConnection connection )
5145
{
5246
try
5347
{
54-
conn.reset();
55-
conn.sync();
48+
// try to use this connection for RESET message
49+
// in case this session has an open result or transaction or something,
50+
// make sure it's reset to a nice state before we reuse it.
51+
connection.reset();
52+
connection.sync();
5653
return true;
5754
}
5855
catch ( Throwable e )

0 commit comments

Comments
 (0)