Skip to content

Commit f7c4c18

Browse files
committed
Fixes from code review
1 parent 8cfe635 commit f7c4c18

File tree

5 files changed

+67
-95
lines changed

5 files changed

+67
-95
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.neo4j.driver.internal.util.Supplier;
3031

@@ -37,6 +38,8 @@ public class BlockingPooledConnectionQueue
3738
/** The backing queue, keeps track of connections currently in queue */
3839
private final BlockingQueue<PooledConnection> queue;
3940

41+
private final AtomicBoolean isTerminating = new AtomicBoolean( false );
42+
4043
/** Keeps track of acquired connections */
4144
private final Set<PooledConnection> acquiredConnections =
4245
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>());
@@ -54,14 +57,45 @@ public BlockingPooledConnectionQueue( int capacity )
5457
*/
5558
public boolean offer( PooledConnection pooledConnection )
5659
{
60+
acquiredConnections.remove( pooledConnection );
5761
boolean offer = queue.offer( pooledConnection );
58-
if ( offer )
59-
{
60-
acquiredConnections.remove( pooledConnection );
62+
// not added back to the queue, dispose of the connection
63+
if (!offer) {
64+
pooledConnection.dispose();
65+
}
66+
if (isTerminating.get()) {
67+
PooledConnection poll = queue.poll();
68+
if (poll != null)
69+
{
70+
poll.dispose();
71+
}
6172
}
6273
return offer;
6374
}
6475

76+
/**
77+
* Acquire connection or create a new one if the queue is empty
78+
* @param supplier used to create a new connection if queue is empty
79+
* @return a PooledConnection instance
80+
*/
81+
public PooledConnection acquire( Supplier<PooledConnection> supplier )
82+
{
83+
84+
PooledConnection poll = queue.poll();
85+
if ( poll == null )
86+
{
87+
poll = supplier.get();
88+
}
89+
acquiredConnections.add( poll );
90+
91+
if (isTerminating.get()) {
92+
acquiredConnections.remove( poll );
93+
poll.dispose();
94+
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
95+
}
96+
return poll;
97+
}
98+
6599
public List<PooledConnection> toList()
66100
{
67101
return new ArrayList<>( queue );
@@ -88,34 +122,21 @@ public boolean contains( PooledConnection pooledConnection )
88122
*/
89123
public void terminate()
90124
{
91-
while ( !queue.isEmpty() )
125+
if (isTerminating.compareAndSet( false, true ))
92126
{
93-
PooledConnection conn = queue.poll();
94-
if ( conn != null )
127+
while ( !queue.isEmpty() )
95128
{
96-
//close the underlying connection without adding it back to the queue
97-
conn.dispose();
129+
PooledConnection conn = queue.poll();
130+
if ( conn != null )
131+
{
132+
//close the underlying connection without adding it back to the queue
133+
conn.dispose();
134+
}
135+
}
136+
for ( PooledConnection pooledConnection : acquiredConnections )
137+
{
138+
pooledConnection.dispose();
98139
}
99140
}
100-
for ( PooledConnection pooledConnection : acquiredConnections )
101-
{
102-
pooledConnection.dispose();
103-
}
104-
}
105-
106-
/**
107-
* Acquire connection or create a new one if the queue is empty
108-
* @param supplier used to create a new connection if queue is empty
109-
* @return a PooledConnection instance
110-
*/
111-
public PooledConnection acquire( Supplier<PooledConnection> supplier )
112-
{
113-
PooledConnection poll = queue.poll();
114-
if ( poll == null )
115-
{
116-
poll = supplier.get();
117-
}
118-
acquiredConnections.add( poll );
119-
return poll;
120141
}
121142
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,44 +30,21 @@
3030
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3131
{
3232
private final BlockingPooledConnectionQueue connections;
33-
private final AtomicBoolean driverStopped;
3433
private final Function<PooledConnection, Boolean> validConnection;
3534

36-
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, AtomicBoolean driverStopped,
35+
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
3736
Function<PooledConnection, Boolean> validConnection)
3837
{
3938
this.connections = connections;
40-
this.driverStopped = driverStopped;
4139
this.validConnection = validConnection;
4240
}
4341

4442
@Override
4543
public void accept( PooledConnection pooledConnection )
4644
{
47-
if( driverStopped.get() )
45+
if ( validConnection.apply( pooledConnection ) )
4846
{
49-
// if the driver already closed, then no need to try to return to pool, just directly close this connection
50-
pooledConnection.dispose();
51-
}
52-
else if ( validConnection.apply( pooledConnection ) )
53-
{
54-
boolean released = connections.offer( pooledConnection );
55-
if( !released )
56-
{
57-
// if the connection could be put back to the pool, then we let the pool to manage it.
58-
// Otherwise, we close the connection directly here.
59-
pooledConnection.dispose();
60-
}
61-
else if ( driverStopped.get() )
62-
{
63-
// If our adding the pooledConnection to the queue was racing with the closing of the driver,
64-
// then the loop where the driver is closing all available connections might not observe our newly
65-
// added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter
66-
// which connection we get back, because other threads might be in the same situation as ours. It only
67-
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
68-
// *a* connection in response.
69-
connections.terminate();
70-
}
47+
connections.offer( pooledConnection );
7148
}
7249
else
7350
{

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public class SocketConnectionPool implements ConnectionPool
6969
private final Logging logging;
7070

7171
/** Shutdown flag */
72-
private final AtomicBoolean stopped = new AtomicBoolean( false );
7372

7473
public SocketConnectionPool( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
7574
PoolSettings poolSettings, Logging logging )
@@ -108,18 +107,14 @@ private static Map<String,Value> tokenAsMap( AuthToken token )
108107
@Override
109108
public Connection acquire( final BoltServerAddress address )
110109
{
111-
if ( stopped.get() )
112-
{
113-
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
114-
}
115110
final BlockingPooledConnectionQueue connections = pool( address );
116111
Supplier<PooledConnection> supplier = new Supplier<PooledConnection>()
117112
{
118113
@Override
119114
public PooledConnection get()
120115
{
121116
return new PooledConnection( connect( address ), new
122-
PooledConnectionReleaseConsumer( connections, stopped,
117+
PooledConnectionReleaseConsumer( connections,
123118
new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock );
124119

125120
}
@@ -166,12 +161,6 @@ public boolean hasAddress( BoltServerAddress address )
166161
@Override
167162
public void close()
168163
{
169-
if ( !stopped.compareAndSet( false, true ) )
170-
{
171-
// already closed or some other thread already started close
172-
return;
173-
}
174-
175164
for ( BlockingPooledConnectionQueue pool : pools.values() )
176165
{
177166
pool.terminate();

driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable
7878
new PooledConnectionValidator( pool( true ), poolSettings );
7979

8080
PooledConnectionReleaseConsumer consumer =
81-
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator);
81+
new PooledConnectionReleaseConsumer( queue, validator);
8282
consumer.accept( conn );
8383

8484
verify( queue, never() ).offer( conn );
@@ -102,7 +102,7 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable
102102
BlockingPooledConnectionQueue
103103
queue = mock( BlockingPooledConnectionQueue.class );
104104
PooledConnectionReleaseConsumer consumer =
105-
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ),validator );
105+
new PooledConnectionReleaseConsumer( queue,validator );
106106
consumer.accept( conn );
107107

108108
verify( queue ).offer( conn );
@@ -121,7 +121,7 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable
121121
BlockingPooledConnectionQueue
122122
queue = mock( BlockingPooledConnectionQueue.class );
123123
PooledConnectionReleaseConsumer consumer =
124-
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator );
124+
new PooledConnectionReleaseConsumer( queue, validator );
125125
consumer.accept( conn );
126126

127127
verify( queue, never() ).offer( conn );
@@ -174,7 +174,7 @@ private void assertUnrecoverable( Neo4jException exception )
174174
BlockingPooledConnectionQueue
175175
queue = mock( BlockingPooledConnectionQueue.class );
176176
PooledConnectionReleaseConsumer consumer =
177-
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator );
177+
new PooledConnectionReleaseConsumer( queue, validator );
178178
consumer.accept( conn );
179179

180180
verify( queue, never() ).offer( conn );
@@ -204,7 +204,7 @@ private void assertRecoverable( Neo4jException exception )
204204
BlockingPooledConnectionQueue
205205
queue = mock( BlockingPooledConnectionQueue.class );
206206
PooledConnectionReleaseConsumer consumer =
207-
new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator );
207+
new PooledConnectionReleaseConsumer( queue, validator );
208208
consumer.accept( conn );
209209

210210
verify( queue ).offer( conn );

driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ public void shouldDisposeConnectionIfNotValidConnection() throws Throwable
6868
final boolean[] flags = {false};
6969

7070
Connection conn = mock( Connection.class );
71-
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
72-
new AtomicBoolean( false ), INVALID_CONNECTION );
71+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, INVALID_CONNECTION );
7372

7473

7574
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
@@ -99,8 +98,7 @@ public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throw
9998
final boolean[] flags = {false};
10099

101100
Connection conn = mock( Connection.class );
102-
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
103-
new AtomicBoolean( false ), VALID_CONNECTION );
101+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION );
104102

105103
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
106104
{
@@ -131,8 +129,7 @@ public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws T
131129
final boolean[] flags = {false};
132130

133131
Connection conn = mock( Connection.class );
134-
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
135-
new AtomicBoolean( false ), VALID_CONNECTION);
132+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION);
136133

137134
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM );
138135
PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
@@ -164,13 +161,12 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable
164161
// session.close() -> well, close the connection directly without putting back to the pool
165162

166163
// Given
167-
final BlockingPooledConnectionQueue
168-
pool = new BlockingPooledConnectionQueue(1);
164+
final BlockingPooledConnectionQueue pool = new BlockingPooledConnectionQueue(1);
165+
pool.terminate();
169166
final boolean[] flags = {false};
170167

171168
Connection conn = mock( Connection.class );
172-
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
173-
new AtomicBoolean( true ), VALID_CONNECTION);
169+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION);
174170

175171
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
176172
{
@@ -193,25 +189,14 @@ public void dispose()
193189
public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool() throws Throwable
194190
{
195191
// Given
196-
final AtomicBoolean stopped = new AtomicBoolean( false );
197192
final BlockingPooledConnectionQueue
198-
pool = new BlockingPooledConnectionQueue(1){
199-
public boolean offer(PooledConnection conn)
200-
{
201-
stopped.set( true );
202-
// some clean work to close all connection in pool
203-
boolean offer = super.offer( conn );
204-
assertThat ( this.size(), equalTo( 1 ) );
205-
// we successfully put the connection back to the pool
206-
return offer;
207-
}
208-
};
193+
pool = new BlockingPooledConnectionQueue(1);
194+
pool.terminate();
209195
final boolean[] flags = {false};
210196

211197
Connection conn = mock( Connection.class );
212198

213-
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
214-
stopped , VALID_CONNECTION);
199+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, VALID_CONNECTION);
215200

216201
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
217202
{

0 commit comments

Comments
 (0)