Skip to content

Commit 5723b6e

Browse files
author
Zhen Li
authored
Merge pull request #297 from lutovich/1.1-idle-connection-refresh
Configurable test-on-borrow for pooled connections
2 parents 2622244 + d916de5 commit 5723b6e

21 files changed

+1006
-168
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.neo4j.driver.v1.Config;
3737
import org.neo4j.driver.v1.Driver;
3838
import org.neo4j.driver.v1.Logger;
39+
import org.neo4j.driver.v1.Logging;
3940
import org.neo4j.driver.v1.exceptions.ClientException;
4041

4142
import static java.lang.String.format;
@@ -91,40 +92,62 @@ private Driver createDriver( BoltServerAddress address, String scheme, Connectio
9192
/**
9293
* Creates new {@link DirectDriver}.
9394
* <p>
94-
* <b>This method is package-private only for testing</b>
95+
* <b>This method is protected only for testing</b>
9596
*/
96-
DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
97-
SecurityPlan securityPlan, SessionFactory sessionFactory )
97+
protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool,
98+
Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
9899
{
99100
return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() );
100101
}
101102

102103
/**
103104
* Creates new {@link RoutingDriver}.
104105
* <p>
105-
* <b>This method is package-private only for testing</b>
106+
* <b>This method is protected only for testing</b>
106107
*/
107-
RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
108-
RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
108+
protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
109+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
109110
{
110111
return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory,
111-
Clock.SYSTEM, config.logging() );
112+
createClock(), config.logging() );
112113
}
113114

114115
/**
115116
* Creates new {@link ConnectionPool}.
116117
* <p>
117-
* <b>This method is package-private only for testing</b>
118+
* <b>This method is protected only for testing</b>
118119
*/
119-
ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
120+
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
120121
{
121122
authToken = authToken == null ? AuthTokens.none() : authToken;
122123

123124
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
124-
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize() );
125-
Connector connector = new SocketConnector( connectionSettings, securityPlan, config.logging() );
125+
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
126+
config.idleTimeBeforeConnectionTest() );
127+
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );
126128

127-
return new SocketConnectionPool( poolSettings, connector, Clock.SYSTEM, config.logging() );
129+
return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
130+
}
131+
132+
/**
133+
* Creates new {@link Clock}.
134+
* <p>
135+
* <b>This method is protected only for testing</b>
136+
*/
137+
protected Clock createClock()
138+
{
139+
return Clock.SYSTEM;
140+
}
141+
142+
/**
143+
* Creates new {@link Connector}.
144+
* <p>
145+
* <b>This method is protected only for testing</b>
146+
*/
147+
protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
148+
Logging logging )
149+
{
150+
return new SocketConnector( connectionSettings, securityPlan, logging );
128151
}
129152

130153
private static SessionFactory createSessionFactory( Config config )

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,37 @@
2020

2121
public class PoolSettings
2222
{
23+
public static final int NO_IDLE_CONNECTION_TEST = -1;
24+
2325
public static final int DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE = 10;
26+
public static final int DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST = NO_IDLE_CONNECTION_TEST;
2427

25-
/**
26-
* Maximum number of idle connections per pool.
27-
*/
2828
private final int maxIdleConnectionPoolSize;
29+
private final long idleTimeBeforeConnectionTest;
2930

30-
public PoolSettings( int maxIdleConnectionPoolSize )
31+
public PoolSettings( int maxIdleConnectionPoolSize, long idleTimeBeforeConnectionTest )
3132
{
3233
this.maxIdleConnectionPoolSize = maxIdleConnectionPoolSize;
34+
this.idleTimeBeforeConnectionTest = idleTimeBeforeConnectionTest;
3335
}
3436

3537
public int maxIdleConnectionPoolSize()
3638
{
3739
return maxIdleConnectionPoolSize;
3840
}
41+
42+
public long idleTimeBeforeConnectionTest()
43+
{
44+
if ( !idleTimeBeforeConnectionTestConfigured() )
45+
{
46+
throw new IllegalStateException(
47+
"Idle time before connection test is not configured: " + idleTimeBeforeConnectionTest );
48+
}
49+
return idleTimeBeforeConnectionTest;
50+
}
51+
52+
public boolean idleTimeBeforeConnectionTestConfigured()
53+
{
54+
return idleTimeBeforeConnectionTest >= 0;
55+
}
3956
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,14 @@ public class PooledConnection implements Connection
5959
private boolean unrecoverableErrorsOccurred = false;
6060
private Runnable onError = null;
6161
private final Clock clock;
62-
private long lastUsed;
62+
private long lastUsedTimestamp;
6363

6464
public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
6565
{
6666
this.delegate = delegate;
6767
this.release = release;
6868
this.clock = clock;
69-
this.lastUsed = clock.millis();
70-
}
71-
72-
public void updateTimestamp()
73-
{
74-
lastUsed = clock.millis();
69+
updateLastUsedTimestamp();
7570
}
7671

7772
@Override
@@ -192,13 +187,14 @@ public void receiveOne()
192187
}
193188
}
194189

195-
@Override
196190
/**
197191
* Make sure only close the connection once on each session to avoid releasing the connection twice, a.k.a.
198192
* adding back the connection twice into the pool.
199193
*/
194+
@Override
200195
public void close()
201196
{
197+
updateLastUsedTimestamp();
202198
release.accept( this );
203199
// put the full logic of deciding whether to dispose the connection or to put it back to
204200
// the pool into the release object
@@ -286,6 +282,11 @@ public void onError( Runnable runnable )
286282
this.onError = runnable;
287283
}
288284

285+
public long lastUsedTimestamp()
286+
{
287+
return lastUsedTimestamp;
288+
}
289+
289290
private boolean isProtocolViolationError(RuntimeException e )
290291
{
291292
return e instanceof Neo4jException
@@ -300,8 +301,8 @@ private boolean isClientOrTransientError( RuntimeException e )
300301
|| ((Neo4jException) e).code().contains( "TransientError" ));
301302
}
302303

303-
public long idleTime()
304+
private void updateLastUsedTimestamp()
304305
{
305-
return clock.millis() - lastUsed;
306+
this.lastUsedTimestamp = clock.millis();
306307
}
307308
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.concurrent.atomic.AtomicBoolean;
22-
21+
import org.neo4j.driver.internal.spi.ConnectionValidator;
2322
import org.neo4j.driver.internal.util.Consumer;
24-
import org.neo4j.driver.v1.util.Function;
2523

2624
/**
2725
* The responsibility of the PooledConnectionReleaseConsumer is to release valid connections
@@ -30,19 +28,19 @@
3028
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3129
{
3230
private final BlockingPooledConnectionQueue connections;
33-
private final Function<PooledConnection, Boolean> validConnection;
31+
private final ConnectionValidator<PooledConnection> connectionValidator;
3432

3533
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
36-
Function<PooledConnection, Boolean> validConnection)
34+
ConnectionValidator<PooledConnection> connectionValidator )
3735
{
3836
this.connections = connections;
39-
this.validConnection = validConnection;
37+
this.connectionValidator = connectionValidator;
4038
}
4139

4240
@Override
4341
public void accept( PooledConnection pooledConnection )
4442
{
45-
if ( validConnection.apply( pooledConnection ) )
43+
if ( connectionValidator.isReusable( pooledConnection ) )
4644
{
4745
connections.offer( pooledConnection );
4846
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.neo4j.driver.internal.net.pooling;
2020

2121
import org.neo4j.driver.internal.spi.ConnectionPool;
22-
import org.neo4j.driver.v1.util.Function;
22+
import org.neo4j.driver.internal.spi.ConnectionValidator;
2323

24-
class PooledConnectionValidator implements Function<PooledConnection,Boolean>
24+
class PooledConnectionValidator implements ConnectionValidator<PooledConnection>
2525
{
2626
private final ConnectionPool pool;
2727

@@ -31,28 +31,25 @@ class PooledConnectionValidator implements Function<PooledConnection,Boolean>
3131
}
3232

3333
@Override
34-
public Boolean apply( PooledConnection pooledConnection )
34+
public boolean isReusable( PooledConnection pooledConnection )
3535
{
3636
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
3737
// and we should close the conn without bothering to reset the conn at all
3838
return pool.hasAddress( pooledConnection.boltServerAddress() ) &&
3939
!pooledConnection.hasUnrecoverableErrors() &&
40-
reset( pooledConnection );
40+
isConnected( pooledConnection );
4141
}
4242

43-
/**
44-
* In case this session has an open result or transaction or something,
45-
* make sure it's reset to a nice state before we reuse it.
46-
*
47-
* @param conn the PooledConnection
48-
* @return true if the connection is reset successfully without any error, otherwise false.
49-
*/
50-
private static boolean reset( PooledConnection conn )
43+
@Override
44+
public boolean isConnected( PooledConnection connection )
5145
{
5246
try
5347
{
54-
conn.reset();
55-
conn.sync();
48+
// try to use this connection for RESET message
49+
// in case this session has an open result or transaction or something,
50+
// make sure it's reset to a nice state before we reuse it.
51+
connection.reset();
52+
connection.sync();
5653
return true;
5754
}
5855
catch ( Throwable e )

0 commit comments

Comments
 (0)