Skip to content

Commit cc5f861

Browse files
committed
Changed to a simpler session pool
1 parent c7e0fba commit cc5f861

14 files changed

+190
-941
lines changed

driver/src/main/java/org/neo4j/driver/internal/pool/Allocator.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java

Lines changed: 30 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@
2424
import java.util.LinkedList;
2525
import java.util.List;
2626
import java.util.ServiceLoader;
27+
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ConcurrentHashMap;
28-
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.LinkedBlockingQueue;
2930

3031
import org.neo4j.driver.internal.connector.socket.SocketConnector;
3132
import org.neo4j.driver.internal.spi.Connection;
3233
import org.neo4j.driver.internal.spi.ConnectionPool;
3334
import org.neo4j.driver.internal.spi.Connector;
3435
import org.neo4j.driver.internal.util.Clock;
35-
import org.neo4j.driver.internal.util.Consumer;
3636
import org.neo4j.driver.v1.AuthToken;
3737
import org.neo4j.driver.v1.Config;
3838
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -62,36 +62,23 @@ public class InternalConnectionPool implements ConnectionPool
6262
/**
6363
* Pools, organized by URL.
6464
*/
65-
private final ConcurrentHashMap<URI,ThreadCachingPool<PooledConnection>> pools = new ConcurrentHashMap<>();
66-
67-
/**
68-
* Connections that fail this criteria will be disposed of.
69-
*/
70-
private final ValidationStrategy<PooledConnection> connectionValidation;
65+
private final ConcurrentHashMap<URI,BlockingQueue<PooledConnection>> pools = new ConcurrentHashMap<>();
7166

7267
private final AuthToken authToken;
73-
/**
74-
* Timeout in milliseconds if there are no available sessions.
75-
*/
76-
private final long acquireSessionTimeout;
77-
7868
private final Clock clock;
7969
private final Config config;
8070

8171
public InternalConnectionPool( Config config, AuthToken authToken )
8272
{
83-
this( loadConnectors(), Clock.SYSTEM, config, authToken,
84-
Long.getLong( "neo4j.driver.acquireSessionTimeout", 30_000 ) );
73+
this( loadConnectors(), Clock.SYSTEM, config, authToken);
8574
}
8675

8776
public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config config,
88-
AuthToken authToken, long acquireTimeout )
77+
AuthToken authToken )
8978
{
9079
this.authToken = authToken;
91-
this.acquireSessionTimeout = acquireTimeout;
9280
this.config = config;
9381
this.clock = clock;
94-
this.connectionValidation = new PooledConnectionValidator( config.idleTimeBeforeConnectionTest() );
9582
for ( Connector connector : conns )
9683
{
9784
for ( String s : connector.supportedSchemes() )
@@ -104,37 +91,32 @@ public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config
10491
@Override
10592
public Connection acquire( URI sessionURI )
10693
{
107-
try
108-
{
109-
Connection conn = pool( sessionURI ).acquire( acquireSessionTimeout, TimeUnit.MILLISECONDS );
94+
BlockingQueue<PooledConnection> connections = pool( sessionURI );
95+
PooledConnection conn = connections.poll();
11096
if ( conn == null )
11197
{
112-
throw new ClientException(
113-
"Failed to acquire a session with Neo4j " +
114-
"as all the connections in the connection pool are already occupied by other sessions. " +
115-
"Please close unused session and retry. " +
116-
"Current Pool size: " + config.connectionPoolSize() +
117-
". If your application requires running more sessions concurrently than the current pool " +
118-
"size, you should create a driver with a larger connection pool size." );
98+
Connector connector = connectors.get( sessionURI.getScheme() );
99+
if ( connector == null )
100+
{
101+
throw new ClientException(
102+
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
103+
sessionURI.getScheme(), sessionURI, connectorSchemes() ) );
104+
}
105+
conn = new PooledConnection(connector.connect( sessionURI, config, authToken ), new PooledConnectionReleaseConsumer( connections, config ), clock);
119106
}
107+
conn.updateUsageTimestamp();
120108
return conn;
121-
}
122-
catch ( InterruptedException e )
123-
{
124-
throw new ClientException( "Interrupted while waiting for a connection to Neo4j." );
125-
}
126109
}
127110

128-
private ThreadCachingPool<PooledConnection> pool( URI sessionURI )
111+
private BlockingQueue<PooledConnection> pool( URI sessionURI )
129112
{
130-
ThreadCachingPool<PooledConnection> pool = pools.get( sessionURI );
113+
BlockingQueue<PooledConnection> pool = pools.get( sessionURI );
131114
if ( pool == null )
132115
{
133-
pool = newPool( sessionURI );
116+
pool = new LinkedBlockingQueue<>(config.maxIdleConnectionPoolSize());
134117
if ( pools.putIfAbsent( sessionURI, pool ) != null )
135118
{
136119
// We lost a race to create the pool, dispose of the one we created, and recurse
137-
pool.close();
138120
return pool( sessionURI );
139121
}
140122
}
@@ -161,48 +143,24 @@ private static Collection<Connector> loadConnectors()
161143
@Override
162144
public void close() throws Neo4jException
163145
{
164-
for ( ThreadCachingPool<PooledConnection> pool : pools.values() )
146+
for ( BlockingQueue<PooledConnection> pool : pools.values() )
165147
{
166-
pool.close();
148+
while ( !pool.isEmpty() )
149+
{
150+
PooledConnection conn = pool.poll();
151+
if ( conn != null )
152+
{
153+
//close the underlying connection without adding it back to the queue
154+
conn.dispose();
155+
}
156+
}
167157
}
158+
168159
pools.clear();
169160
}
170161

171162
private String connectorSchemes()
172163
{
173164
return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) );
174165
}
175-
176-
private ThreadCachingPool<PooledConnection> newPool( final URI uri )
177-
{
178-
179-
return new ThreadCachingPool<>( config.connectionPoolSize(), new Allocator<PooledConnection>()
180-
{
181-
@Override
182-
public PooledConnection allocate( Consumer<PooledConnection> release )
183-
{
184-
Connector connector = connectors.get( uri.getScheme() );
185-
if ( connector == null )
186-
{
187-
throw new ClientException(
188-
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
189-
uri.getScheme(), uri, connectorSchemes() ) );
190-
}
191-
Connection conn = connector.connect( uri, config, authToken );
192-
return new PooledConnection( conn, release );
193-
}
194-
195-
@Override
196-
public void onDispose( PooledConnection pooledConnection )
197-
{
198-
pooledConnection.dispose();
199-
}
200-
201-
@Override
202-
public void onAcquire( PooledConnection pooledConnection )
203-
{
204-
205-
}
206-
}, connectionValidation, clock );
207-
}
208166
}

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.neo4j.driver.internal.spi.Connection;
2424
import org.neo4j.driver.internal.spi.StreamCollector;
25+
import org.neo4j.driver.internal.util.Clock;
2526
import org.neo4j.driver.internal.util.Consumer;
2627
import org.neo4j.driver.v1.Value;
2728
import org.neo4j.driver.v1.exceptions.Neo4jException;
@@ -30,16 +31,24 @@ public class PooledConnection implements Connection
3031
{
3132
/** The real connection who will do all the real jobs */
3233
private final Connection delegate;
33-
/** A reference to the {@link ThreadCachingPool pool} so that we could return this resource back */
3434
private final Consumer<PooledConnection> release;
3535

3636
private boolean unrecoverableErrorsOccurred = false;
3737
private Runnable onError = null;
38+
private final Clock clock;
39+
private long lastUsed;
3840

39-
public PooledConnection( Connection delegate, Consumer<PooledConnection> release )
41+
public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
4042
{
4143
this.delegate = delegate;
4244
this.release = release;
45+
this.clock = clock;
46+
this.lastUsed = clock.millis();
47+
}
48+
49+
public void updateUsageTimestamp()
50+
{
51+
lastUsed = clock.millis();
4352
}
4453

4554
@Override
@@ -221,4 +230,9 @@ private boolean isClientOrTransientError( RuntimeException e )
221230
&& (((Neo4jException) e).neo4jErrorCode().contains( "ClientError" )
222231
|| ((Neo4jException) e).neo4jErrorCode().contains( "TransientError" ));
223232
}
233+
234+
public long idleTime()
235+
{
236+
return clock.millis() - lastUsed;
237+
}
224238
}

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionValidator.java renamed to driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,42 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.BlockingQueue;
2324

2425
import org.neo4j.driver.internal.spi.StreamCollector;
26+
import org.neo4j.driver.internal.util.Consumer;
27+
import org.neo4j.driver.v1.Config;
2528
import org.neo4j.driver.v1.Value;
2629

2730
/**
28-
* Validates connections - determining if they are ok to keep in the pool, or if they should be disposed of.
31+
* The responsibility of the PooledConnectionReleaseConsumer is to release valid connections
32+
* back to the connections queue.
2933
*/
30-
public class PooledConnectionValidator implements ValidationStrategy<PooledConnection>
34+
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3135
{
32-
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
33-
34-
/**
35-
* Connections that have been idle longer than this threshold will have a ping test performed on them.
36-
*/
36+
private final BlockingQueue<PooledConnection> connections;
3737
private final long minIdleBeforeConnectionTest;
38+
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
3839

39-
public PooledConnectionValidator( long minIdleBeforeConnectionTest )
40+
PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, Config config )
4041
{
41-
this.minIdleBeforeConnectionTest = minIdleBeforeConnectionTest;
42+
this.connections = connections;
43+
this.minIdleBeforeConnectionTest = config.idleTimeBeforeConnectionTest();
4244
}
4345

4446
@Override
45-
public boolean isValid( PooledConnection conn, long idleTime )
47+
public void accept( PooledConnection pooledConnection )
4648
{
47-
if ( conn.hasUnrecoverableErrors() )
49+
if ( validConnection( pooledConnection ) )
4850
{
49-
return false;
51+
connections.offer( pooledConnection );
5052
}
53+
}
5154

52-
return idleTime <= minIdleBeforeConnectionTest || ping( conn );
55+
private boolean validConnection( PooledConnection pooledConnection )
56+
{
57+
return !pooledConnection.hasUnrecoverableErrors() &&
58+
(pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection ));
5359
}
5460

5561
private boolean ping( PooledConnection conn )
@@ -60,7 +66,8 @@ private boolean ping( PooledConnection conn )
6066
conn.pullAll( StreamCollector.NO_OP );
6167
conn.sync();
6268
return true;
63-
} catch( Throwable e )
69+
}
70+
catch ( Throwable e )
6471
{
6572
return false;
6673
}

0 commit comments

Comments
 (0)