Skip to content

Commit bc3fe06

Browse files
committed
Fail-fast interactions with closed driver
It was previously possible to use closed driver instance, obtain sessions from it and run queries. This was seen to cause problems in multi-threaded scenarios when some worker threads try to use driver, which was concurrently closed by other worker. This commit makes driver track if it is closed or not and throw exceptions if on every interaction when closed. Thrown exception is `IllegalStateException`. Actuall `#close()` method can be called multiple times and is a no-op for already closed driver.
1 parent 714b5a2 commit bc3fe06

File tree

5 files changed

+331
-23
lines changed

5 files changed

+331
-23
lines changed

driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,26 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
2019
package org.neo4j.driver.internal;
2120

21+
import java.util.concurrent.locks.ReentrantReadWriteLock;
22+
2223
import org.neo4j.driver.internal.security.SecurityPlan;
24+
import org.neo4j.driver.v1.AccessMode;
2325
import org.neo4j.driver.v1.Driver;
2426
import org.neo4j.driver.v1.Logger;
2527
import org.neo4j.driver.v1.Logging;
28+
import org.neo4j.driver.v1.Session;
2629

2730
abstract class BaseDriver implements Driver
2831
{
32+
private final static String DRIVER_LOG_NAME = "Driver";
33+
2934
private final SecurityPlan securityPlan;
3035
protected final Logger log;
31-
private final static String DRIVER_LOG_NAME = "Driver";
36+
37+
private final ReentrantReadWriteLock closedLock = new ReentrantReadWriteLock();
38+
private boolean closed;
3239

3340
BaseDriver( SecurityPlan securityPlan, Logging logging )
3441
{
@@ -37,8 +44,68 @@ abstract class BaseDriver implements Driver
3744
}
3845

3946
@Override
40-
public boolean isEncrypted()
47+
public final boolean isEncrypted()
48+
{
49+
closedLock.readLock().lock();
50+
try
51+
{
52+
assertOpen();
53+
return securityPlan.requiresEncryption();
54+
}
55+
finally
56+
{
57+
closedLock.readLock().unlock();
58+
}
59+
}
60+
61+
@Override
62+
public final Session session()
63+
{
64+
return session( AccessMode.WRITE );
65+
}
66+
67+
@Override
68+
public final Session session( AccessMode mode )
69+
{
70+
closedLock.readLock().lock();
71+
try
72+
{
73+
assertOpen();
74+
return newSessionWithMode( mode );
75+
}
76+
finally
77+
{
78+
closedLock.readLock().unlock();
79+
}
80+
}
81+
82+
@Override
83+
public final void close()
84+
{
85+
closedLock.writeLock().lock();
86+
try
87+
{
88+
if ( !closed )
89+
{
90+
closeResources();
91+
}
92+
}
93+
finally
94+
{
95+
closed = true;
96+
closedLock.writeLock().unlock();
97+
}
98+
}
99+
100+
protected abstract Session newSessionWithMode( AccessMode mode );
101+
102+
protected abstract void closeResources();
103+
104+
private void assertOpen()
41105
{
42-
return securityPlan.requiresEncryption();
106+
if ( closed )
107+
{
108+
throw new IllegalStateException( "This driver instance has already been closed" );
109+
}
43110
}
44111
}

driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,13 @@ public DirectDriver(
4444
}
4545

4646
@Override
47-
public Session session()
47+
protected Session newSessionWithMode( AccessMode mode )
4848
{
4949
return new NetworkSession( connections.acquire( address ) );
5050
}
5151

5252
@Override
53-
public Session session( AccessMode ignore )
54-
{
55-
return session();
56-
}
57-
58-
@Override
59-
public void close()
53+
protected void closeResources()
6054
{
6155
try
6256
{

driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,16 +59,11 @@ public RoutingDriver(
5959
}
6060

6161
@Override
62-
public Session session()
63-
{
64-
return session( AccessMode.WRITE );
65-
}
66-
67-
@Override
68-
public Session session( final AccessMode mode )
62+
protected Session newSessionWithMode( AccessMode mode )
6963
{
7064
Connection connection = acquireConnection( mode );
71-
return new RoutingNetworkSession( new NetworkSession( connection ), mode, connection.boltServerAddress(), loadBalancer );
65+
NetworkSession networkSession = new NetworkSession( connection );
66+
return new RoutingNetworkSession( networkSession, mode, connection.boltServerAddress(), loadBalancer );
7267
}
7368

7469
private Connection acquireConnection( AccessMode role )
@@ -85,7 +80,7 @@ private Connection acquireConnection( AccessMode role )
8580
}
8681

8782
@Override
88-
public void close()
83+
protected void closeResources()
8984
{
9085
try
9186
{

driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,22 @@
2020

2121
import org.junit.Test;
2222

23-
import java.util.concurrent.atomic.AtomicBoolean;
24-
2523
import org.neo4j.driver.internal.spi.Connection;
2624
import org.neo4j.driver.internal.util.Clock;
25+
import org.neo4j.driver.internal.util.Supplier;
2726
import org.neo4j.driver.v1.exceptions.ClientException;
2827
import org.neo4j.driver.v1.util.Function;
2928

3029
import static org.hamcrest.CoreMatchers.equalTo;
3130
import static org.hamcrest.MatcherAssert.assertThat;
31+
import static org.junit.Assert.assertSame;
3232
import static org.junit.Assert.assertTrue;
3333
import static org.junit.Assert.fail;
3434
import static org.mockito.Mockito.doThrow;
3535
import static org.mockito.Mockito.mock;
3636
import static org.mockito.Mockito.times;
3737
import static org.mockito.Mockito.verify;
38+
import static org.mockito.Mockito.when;
3839

3940
public class PooledConnectionTest
4041
{
@@ -151,6 +152,56 @@ public void dispose()
151152
assertThat( flags[0], equalTo( true ) );
152153
}
153154

155+
@Test
156+
@SuppressWarnings( "unchecked" )
157+
public void shouldDisposeAcquiredConnectionsWhenPoolIsClosed()
158+
{
159+
PooledConnection connection = mock( PooledConnection.class );
160+
161+
BlockingPooledConnectionQueue pool = new BlockingPooledConnectionQueue( 5 );
162+
163+
Supplier<PooledConnection> pooledConnectionFactory = mock( Supplier.class );
164+
when( pooledConnectionFactory.get() ).thenReturn( connection );
165+
166+
PooledConnection acquiredConnection = pool.acquire( pooledConnectionFactory );
167+
assertSame( acquiredConnection, connection );
168+
169+
pool.terminate();
170+
verify( connection ).dispose();
171+
}
172+
173+
@Test
174+
@SuppressWarnings( "unchecked" )
175+
public void shouldDisposeAcquiredAndIdleConnectionsWhenPoolIsClosed()
176+
{
177+
PooledConnection connection1 = mock( PooledConnection.class );
178+
PooledConnection connection2 = mock( PooledConnection.class );
179+
PooledConnection connection3 = mock( PooledConnection.class );
180+
181+
BlockingPooledConnectionQueue pool = new BlockingPooledConnectionQueue( 5 );
182+
183+
Supplier<PooledConnection> pooledConnectionFactory = mock( Supplier.class );
184+
when( pooledConnectionFactory.get() )
185+
.thenReturn( connection1 )
186+
.thenReturn( connection2 )
187+
.thenReturn( connection3 );
188+
189+
PooledConnection acquiredConnection1 = pool.acquire( pooledConnectionFactory );
190+
PooledConnection acquiredConnection2 = pool.acquire( pooledConnectionFactory );
191+
PooledConnection acquiredConnection3 = pool.acquire( pooledConnectionFactory );
192+
assertSame( acquiredConnection1, connection1 );
193+
assertSame( acquiredConnection2, connection2 );
194+
assertSame( acquiredConnection3, connection3 );
195+
196+
pool.offer( acquiredConnection2 );
197+
198+
pool.terminate();
199+
200+
verify( connection1 ).dispose();
201+
verify( connection2 ).dispose();
202+
verify( connection3 ).dispose();
203+
}
204+
154205
@Test
155206
public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable
156207
{

0 commit comments

Comments
 (0)