Skip to content

Commit 8bc9779

Browse files
author
Zhen
committed
Added the close connection logic into the new pool
When the pool is full, close the connection directly When driver is closed, session.close will close the connection directly
1 parent dd2caa0 commit 8bc9779

File tree

4 files changed

+131
-22
lines changed

4 files changed

+131
-22
lines changed

driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.BlockingQueue;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031

3132
import org.neo4j.driver.internal.connector.socket.SocketConnector;
3233
import org.neo4j.driver.internal.spi.Connection;
@@ -68,6 +69,9 @@ public class InternalConnectionPool implements ConnectionPool
6869
private final Clock clock;
6970
private final Config config;
7071

72+
/** Shutdown flag */
73+
private final AtomicBoolean stopped = new AtomicBoolean( false );
74+
7175
public InternalConnectionPool( Config config, AuthToken authToken )
7276
{
7377
this( loadConnectors(), Clock.SYSTEM, config, authToken);
@@ -91,21 +95,26 @@ public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config
9195
@Override
9296
public Connection acquire( URI sessionURI )
9397
{
94-
BlockingQueue<PooledConnection> connections = pool( sessionURI );
95-
PooledConnection conn = connections.poll();
96-
if ( conn == null )
98+
if ( stopped.get() )
99+
{
100+
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
101+
}
102+
BlockingQueue<PooledConnection> connections = pool( sessionURI );
103+
PooledConnection conn = connections.poll();
104+
if ( conn == null )
105+
{
106+
Connector connector = connectors.get( sessionURI.getScheme() );
107+
if ( connector == null )
97108
{
98-
Connector connector = connectors.get( sessionURI.getScheme() );
99-
if ( connector == null )
100-
{
101-
throw new ClientException(
102-
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
103-
sessionURI.getScheme(), sessionURI, connectorSchemes() ) );
104-
}
105-
conn = new PooledConnection(connector.connect( sessionURI, config, authToken ), new PooledConnectionReleaseConsumer( connections, config ), clock);
109+
throw new ClientException(
110+
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
111+
sessionURI.getScheme(), sessionURI, connectorSchemes() ) );
106112
}
107-
conn.updateUsageTimestamp();
108-
return conn;
113+
conn = new PooledConnection(connector.connect( sessionURI, config, authToken ), new
114+
PooledConnectionReleaseConsumer( connections, stopped, config ), clock);
115+
}
116+
conn.updateUsageTimestamp();
117+
return conn;
109118
}
110119

111120
private BlockingQueue<PooledConnection> pool( URI sessionURI )
@@ -143,6 +152,12 @@ private static Collection<Connector> loadConnectors()
143152
@Override
144153
public void close() throws Neo4jException
145154
{
155+
if( !stopped.compareAndSet( false, true ) )
156+
{
157+
// already closed or some other thread already started close
158+
return;
159+
}
160+
146161
for ( BlockingQueue<PooledConnection> pool : pools.values() )
147162
{
148163
while ( !pool.isEmpty() )

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425

2526
import org.neo4j.driver.internal.spi.StreamCollector;
2627
import org.neo4j.driver.internal.util.Consumer;
@@ -36,23 +37,37 @@ class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3637
private final BlockingQueue<PooledConnection> connections;
3738
private final long minIdleBeforeConnectionTest;
3839
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
40+
private final AtomicBoolean driverStopped;
3941

40-
PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, Config config )
42+
PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, AtomicBoolean driverStopped,
43+
Config config )
4144
{
4245
this.connections = connections;
46+
this.driverStopped = driverStopped;
4347
this.minIdleBeforeConnectionTest = config.idleTimeBeforeConnectionTest();
4448
}
4549

4650
@Override
4751
public void accept( PooledConnection pooledConnection )
4852
{
49-
if ( validConnection( pooledConnection ) )
53+
if( driverStopped.get() )
5054
{
51-
connections.offer( pooledConnection );
55+
// if the driver already closed, then no need to try to return to pool, just directly close this connection
56+
pooledConnection.dispose();
57+
}
58+
else if ( validConnection( pooledConnection ) )
59+
{
60+
boolean released = connections.offer( pooledConnection );
61+
if( !released )
62+
{
63+
// if the connection could be put back to the pool, then we let the pool to manage it.
64+
// Otherwise, we close the connection directly here.
65+
pooledConnection.dispose();
66+
}
5267
}
5368
}
5469

55-
private boolean validConnection( PooledConnection pooledConnection )
70+
boolean validConnection( PooledConnection pooledConnection )
5671
{
5772
return !pooledConnection.hasUnrecoverableErrors() &&
5873
(pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection ));

driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627

2728
import org.neo4j.driver.internal.spi.Connection;
2829
import org.neo4j.driver.internal.util.Clock;
@@ -64,7 +65,7 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable
6465
// When/Then
6566
BlockingQueue<PooledConnection> queue = mock( BlockingQueue.class );
6667
PooledConnectionReleaseConsumer consumer =
67-
new PooledConnectionReleaseConsumer( queue, config );
68+
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), config );
6869
consumer.accept( conn );
6970

7071
verify( queue, never() ).add( conn );
@@ -83,7 +84,7 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable
8384
// When/Then
8485
BlockingQueue<PooledConnection> queue = mock( BlockingQueue.class );
8586
PooledConnectionReleaseConsumer consumer =
86-
new PooledConnectionReleaseConsumer( queue, config );
87+
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), config );
8788
consumer.accept( conn );
8889

8990
verify( queue ).offer( conn );
@@ -131,7 +132,7 @@ private void assertUnrecoverable( Neo4jException exception )
131132
assertTrue( conn.hasUnrecoverableErrors() );
132133
BlockingQueue<PooledConnection> queue = mock( BlockingQueue.class );
133134
PooledConnectionReleaseConsumer consumer =
134-
new PooledConnectionReleaseConsumer( queue, Config.defaultConfig() );
135+
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), Config.defaultConfig() );
135136
consumer.accept( conn );
136137

137138
verify( queue, never() ).offer( conn );
@@ -157,7 +158,7 @@ private void assertRecoverable( Neo4jException exception )
157158
assertFalse( conn.hasUnrecoverableErrors() );
158159
BlockingQueue<PooledConnection> queue = mock( BlockingQueue.class );
159160
PooledConnectionReleaseConsumer consumer =
160-
new PooledConnectionReleaseConsumer( queue, Config.defaultConfig() );
161+
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), Config.defaultConfig() );
161162
consumer.accept( conn );
162163

163164
verify( queue ).offer( conn );

driver/src/test/java/org/neo4j/driver/internal/pool/PooledConnectionTest.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121
import org.junit.Test;
2222

2323
import java.util.LinkedList;
24+
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2427

2528
import org.neo4j.driver.internal.spi.Connection;
2629
import org.neo4j.driver.internal.spi.StreamCollector;
2730
import org.neo4j.driver.internal.util.Clock;
2831
import org.neo4j.driver.internal.util.Consumer;
32+
import org.neo4j.driver.v1.Config;
2933
import org.neo4j.driver.v1.exceptions.DatabaseException;
3034

3135
import static org.hamcrest.CoreMatchers.equalTo;
@@ -62,4 +66,78 @@ public void accept( PooledConnection pooledConnection )
6266
assertThat( returnedToPool, hasItem(pooledConnection) );
6367
assertThat( returnedToPool.size(), equalTo( 1 ));
6468
}
65-
}
69+
70+
71+
@Test
72+
public void shouldCallDisposeToCloseConnectionDirectlyIfIdlePoolIsFull() throws Throwable
73+
{
74+
// Given
75+
final BlockingQueue<PooledConnection> pool = new LinkedBlockingQueue<>(1);
76+
77+
final boolean[] flags = {false};
78+
79+
Connection conn = mock( Connection.class );
80+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
81+
new AtomicBoolean( false ), Config.defaultConfig() /*Does not matter what config for this test*/ )
82+
{
83+
@Override
84+
boolean validConnection( PooledConnection conn )
85+
{
86+
return true;
87+
}
88+
};
89+
90+
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM );
91+
PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
92+
{
93+
@Override
94+
public void dispose()
95+
{
96+
flags[0] = true;
97+
}
98+
};
99+
100+
// When
101+
pooledConnection.close();
102+
shouldBeClosedConnection.close();
103+
104+
// Then
105+
assertThat( pool, hasItem(pooledConnection) );
106+
assertThat( pool.size(), equalTo( 1 ) );
107+
assertThat( flags[0], equalTo( true ) );
108+
}
109+
110+
@Test
111+
public void shouldCallDisposeToCloseConnectionIfDriverCloseBeforeSessionClose() throws Throwable
112+
{
113+
// driver = GraphDatabase.driver();
114+
// session = driver.session();
115+
// ...
116+
// driver.close() -> clear the pools
117+
// session.close() -> well, close the connection directly without putting back to the pool
118+
119+
// Given
120+
final BlockingQueue<PooledConnection> pool = new LinkedBlockingQueue<>(1);
121+
final boolean[] flags = {false};
122+
123+
Connection conn = mock( Connection.class );
124+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
125+
new AtomicBoolean( true ), Config.defaultConfig() /*Does not matter what config for this test*/ );
126+
127+
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
128+
{
129+
@Override
130+
public void dispose()
131+
{
132+
flags[0] = true;
133+
}
134+
};
135+
136+
// When
137+
pooledConnection.close();
138+
139+
// Then
140+
assertThat( pool.size(), equalTo( 0 ) );
141+
assertThat( flags[0], equalTo( true ) ); // make sure that the dispose is called
142+
}
143+
}

0 commit comments

Comments
 (0)