Skip to content

Commit 0b2b4b2

Browse files
author
Zhen Li
authored
Merge pull request #252 from pontusmelke/1.1-close-conn-on-driver-close
Close all connections on driver.close()
2 parents 60acb8a + f7c4c18 commit 0b2b4b2

File tree

8 files changed

+352
-143
lines changed

8 files changed

+352
-143
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public synchronized void reset()
139139
@Override
140140
public boolean isOpen()
141141
{
142-
return isOpen.get();
142+
return isOpen.get() && connection.isOpen();
143143
}
144144

145145
@Override
@@ -177,10 +177,6 @@ public void close()
177177
{
178178
connection.sync();
179179
}
180-
catch ( Throwable t )
181-
{
182-
throw t;
183-
}
184180
finally
185181
{
186182
closeConnection();
@@ -314,7 +310,7 @@ private void ensureConnectionIsOpen()
314310

315311
private void ensureSessionIsOpen()
316312
{
317-
if ( !isOpen() )
313+
if ( !isOpen.get() )
318314
{
319315
throw new ClientException(
320316
"No more interaction with this session is allowed " +
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
import org.neo4j.driver.internal.util.Supplier;
31+
32+
/**
33+
* A blocking queue that also keeps track of connections that are acquired in order
34+
* to facilitate termination of all connections.
35+
*/
36+
public class BlockingPooledConnectionQueue
37+
{
38+
/** The backing queue, keeps track of connections currently in queue */
39+
private final BlockingQueue<PooledConnection> queue;
40+
41+
private final AtomicBoolean isTerminating = new AtomicBoolean( false );
42+
43+
/** Keeps track of acquired connections */
44+
private final Set<PooledConnection> acquiredConnections =
45+
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>());
46+
47+
public BlockingPooledConnectionQueue( int capacity )
48+
{
49+
this.queue = new LinkedBlockingQueue<>( capacity );
50+
}
51+
52+
/**
53+
* Offer a connections back to the queue
54+
*
55+
* @param pooledConnection the connection to put back to the queue
56+
* @return <code>true</code> if connections was accepted otherwise <code>false</code>
57+
*/
58+
public boolean offer( PooledConnection pooledConnection )
59+
{
60+
acquiredConnections.remove( pooledConnection );
61+
boolean offer = queue.offer( pooledConnection );
62+
// not added back to the queue, dispose of the connection
63+
if (!offer) {
64+
pooledConnection.dispose();
65+
}
66+
if (isTerminating.get()) {
67+
PooledConnection poll = queue.poll();
68+
if (poll != null)
69+
{
70+
poll.dispose();
71+
}
72+
}
73+
return offer;
74+
}
75+
76+
/**
77+
* Acquire connection or create a new one if the queue is empty
78+
* @param supplier used to create a new connection if queue is empty
79+
* @return a PooledConnection instance
80+
*/
81+
public PooledConnection acquire( Supplier<PooledConnection> supplier )
82+
{
83+
84+
PooledConnection poll = queue.poll();
85+
if ( poll == null )
86+
{
87+
poll = supplier.get();
88+
}
89+
acquiredConnections.add( poll );
90+
91+
if (isTerminating.get()) {
92+
acquiredConnections.remove( poll );
93+
poll.dispose();
94+
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
95+
}
96+
return poll;
97+
}
98+
99+
public List<PooledConnection> toList()
100+
{
101+
return new ArrayList<>( queue );
102+
}
103+
104+
public boolean isEmpty()
105+
{
106+
return queue.isEmpty();
107+
}
108+
109+
public int size()
110+
{
111+
return queue.size();
112+
}
113+
114+
public boolean contains( PooledConnection pooledConnection )
115+
{
116+
return queue.contains( pooledConnection );
117+
}
118+
119+
/**
120+
* Terminates all connections, both those that are currently in the queue as well
121+
* as those that have been acquired.
122+
*/
123+
public void terminate()
124+
{
125+
if (isTerminating.compareAndSet( false, true ))
126+
{
127+
while ( !queue.isEmpty() )
128+
{
129+
PooledConnection conn = queue.poll();
130+
if ( conn != null )
131+
{
132+
//close the underlying connection without adding it back to the queue
133+
conn.dispose();
134+
}
135+
}
136+
for ( PooledConnection pooledConnection : acquiredConnections )
137+
{
138+
pooledConnection.dispose();
139+
}
140+
}
141+
}
142+
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.concurrent.BlockingQueue;
2221
import java.util.concurrent.atomic.AtomicBoolean;
2322

2423
import org.neo4j.driver.internal.util.Consumer;
@@ -30,49 +29,22 @@
3029
*/
3130
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3231
{
33-
private final BlockingQueue<PooledConnection> connections;
34-
private final AtomicBoolean driverStopped;
32+
private final BlockingPooledConnectionQueue connections;
3533
private final Function<PooledConnection, Boolean> validConnection;
3634

37-
PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, AtomicBoolean driverStopped,
35+
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
3836
Function<PooledConnection, Boolean> validConnection)
3937
{
4038
this.connections = connections;
41-
this.driverStopped = driverStopped;
4239
this.validConnection = validConnection;
4340
}
4441

4542
@Override
4643
public void accept( PooledConnection pooledConnection )
4744
{
48-
if( driverStopped.get() )
45+
if ( validConnection.apply( pooledConnection ) )
4946
{
50-
// if the driver already closed, then no need to try to return to pool, just directly close this connection
51-
pooledConnection.dispose();
52-
}
53-
else if ( validConnection.apply( pooledConnection ) )
54-
{
55-
boolean released = connections.offer( pooledConnection );
56-
if( !released )
57-
{
58-
// if the connection could be put back to the pool, then we let the pool to manage it.
59-
// Otherwise, we close the connection directly here.
60-
pooledConnection.dispose();
61-
}
62-
else if ( driverStopped.get() )
63-
{
64-
// If our adding the pooledConnection to the queue was racing with the closing of the driver,
65-
// then the loop where the driver is closing all available connections might not observe our newly
66-
// added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter
67-
// which connection we get back, because other threads might be in the same situation as ours. It only
68-
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
69-
// *a* connection in response.
70-
PooledConnection conn = connections.poll();
71-
if ( conn != null )
72-
{
73-
conn.dispose();
74-
}
75-
}
47+
connections.offer( pooledConnection );
7648
}
7749
else
7850
{

0 commit comments

Comments
 (0)