Skip to content

Commit 2aaa75b

Browse files
authored
Merge pull request #414 from lutovich/1.5-driver-close-async
Expose `Driver#closeAsync()`
2 parents 1fd39df + f3a8f92 commit 2aaa75b

File tree

13 files changed

+131
-40
lines changed

13 files changed

+131
-40
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.CompletionStage;
2222

2323
import org.neo4j.driver.internal.async.AsyncConnection;
24-
import org.neo4j.driver.internal.async.Futures;
2524
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
2625
import org.neo4j.driver.internal.net.BoltServerAddress;
2726
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -61,10 +60,18 @@ public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode
6160
}
6261

6362
@Override
64-
public void close() throws Exception
63+
public CompletionStage<Void> close()
6564
{
66-
pool.close();
67-
Futures.getBlocking( asyncPool.closeAsync() );
65+
// todo: remove this try-catch when blocking API works on top of async
66+
try
67+
{
68+
pool.close();
69+
}
70+
catch ( Exception e )
71+
{
72+
throw new RuntimeException( e );
73+
}
74+
return asyncPool.close();
6875
}
6976

7077
public BoltServerAddress getAddress()

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
9292
try
9393
{
9494
connectionPool.close();
95-
Futures.getBlocking( asyncConnectionPool.closeAsync() );
95+
Futures.getBlocking( asyncConnectionPool.close() );
9696
}
9797
catch ( Throwable closeError )
9898
{

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,18 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223

24+
import org.neo4j.driver.internal.async.Futures;
2325
import org.neo4j.driver.internal.security.SecurityPlan;
2426
import org.neo4j.driver.v1.AccessMode;
2527
import org.neo4j.driver.v1.Driver;
2628
import org.neo4j.driver.v1.Logger;
2729
import org.neo4j.driver.v1.Logging;
2830
import org.neo4j.driver.v1.Session;
2931

30-
import static java.lang.String.format;
32+
import static java.util.concurrent.CompletableFuture.completedFuture;
3133

3234
public class InternalDriver implements Driver
3335
{
@@ -95,23 +97,26 @@ private Session newSession( AccessMode mode, Bookmark bookmark )
9597
Session session = sessionFactory.newInstance( mode, bookmark );
9698
if ( closed.get() )
9799
{
98-
// the driver is already closed and we either 1. obtain this session from the old session pool
99-
// or 2. we obtain this session from a new session pool
100-
// For 1. this closeResources will take no effect as everything is already closed.
101-
// For 2. this closeResources will close the new connection pool just created to ensure no resource leak.
102-
closeResources();
100+
// session does not immediately acquire connection, it is fine to just throw
103101
throw driverCloseException();
104102
}
105103
return session;
106104
}
107105

108106
@Override
109107
public final void close()
108+
{
109+
Futures.getBlocking( closeAsync() );
110+
}
111+
112+
@Override
113+
public CompletionStage<Void> closeAsync()
110114
{
111115
if ( closed.compareAndSet( false, true ) )
112116
{
113-
closeResources();
117+
return sessionFactory.close();
114118
}
119+
return completedFuture( null );
115120
}
116121

117122
/**
@@ -126,18 +131,6 @@ public final SessionFactory getSessionFactory()
126131
return sessionFactory;
127132
}
128133

129-
private void closeResources()
130-
{
131-
try
132-
{
133-
sessionFactory.close();
134-
}
135-
catch ( Exception ex )
136-
{
137-
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
138-
}
139-
}
140-
141134
private void assertOpen()
142135
{
143136
if ( closed.get() )

driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
import org.neo4j.driver.v1.AccessMode;
2224
import org.neo4j.driver.v1.Session;
2325

24-
public interface SessionFactory extends AutoCloseable
26+
public interface SessionFactory
2527
{
2628
Session newInstance( AccessMode mode, Bookmark bookmark );
29+
30+
CompletionStage<Void> close();
2731
}

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
import org.neo4j.driver.internal.retry.RetryLogic;
2224
import org.neo4j.driver.internal.spi.ConnectionProvider;
2325
import org.neo4j.driver.v1.AccessMode;
@@ -57,9 +59,9 @@ protected NetworkSession createSession( ConnectionProvider connectionProvider, R
5759
}
5860

5961
@Override
60-
public final void close() throws Exception
62+
public final CompletionStage<Void> close()
6163
{
62-
connectionProvider.close();
64+
return connectionProvider.close();
6365
}
6466

6567
/**

driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ public interface AsyncConnectionPool
3333

3434
int activeConnections( BoltServerAddress address );
3535

36-
CompletionStage<?> closeAsync();
36+
CompletionStage<Void> close();
3737
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public int activeConnections( BoltServerAddress address )
109109
}
110110

111111
@Override
112-
public CompletionStage<?> closeAsync()
112+
public CompletionStage<Void> close()
113113
{
114114
if ( closed.compareAndSet( false, true ) )
115115
{
@@ -128,7 +128,8 @@ public CompletionStage<?> closeAsync()
128128
eventLoopGroup().shutdownGracefully();
129129
}
130130
}
131-
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() );
131+
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
132+
.thenApply( ignore -> null );
132133
}
133134

134135
private ChannelPool getOrCreatePool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import org.neo4j.driver.internal.RoutingErrorHandler;
2828
import org.neo4j.driver.internal.async.AsyncConnection;
29-
import org.neo4j.driver.internal.async.Futures;
3029
import org.neo4j.driver.internal.async.RoutingAsyncConnection;
3130
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
3231
import org.neo4j.driver.internal.cluster.AddressSet;
@@ -52,7 +51,7 @@
5251

5352
import static java.util.concurrent.CompletableFuture.completedFuture;
5453

55-
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
54+
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler
5655
{
5756
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer";
5857

@@ -131,10 +130,18 @@ public void onWriteFailure( BoltServerAddress address )
131130
}
132131

133132
@Override
134-
public void close() throws Exception
133+
public CompletionStage<Void> close()
135134
{
136-
connections.close();
137-
Futures.getBlocking( asyncConnectionPool.closeAsync() );
135+
try
136+
{
137+
connections.close();
138+
}
139+
catch ( Exception e )
140+
{
141+
throw new RuntimeException( e );
142+
}
143+
144+
return asyncConnectionPool.close();
138145
}
139146

140147
private PooledConnection acquireConnection( AccessMode mode, AddressSet servers )

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* Interface defines a layer used by the driver to obtain connections. It is meant to be the only component that
2828
* differs between "direct" and "routing" driver.
2929
*/
30-
public interface ConnectionProvider extends AutoCloseable
30+
public interface ConnectionProvider
3131
{
3232
/**
3333
* Acquire new {@link PooledConnection pooled connection} for the given {@link AccessMode mode}.
@@ -38,4 +38,6 @@ public interface ConnectionProvider extends AutoCloseable
3838
PooledConnection acquireConnection( AccessMode mode );
3939

4040
CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode );
41+
42+
CompletionStage<Void> close();
4143
}

driver/src/main/java/org/neo4j/driver/v1/Driver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.v1;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
/**
2224
* Accessor for a specific Neo4j graph database.
2325
* <p>
@@ -140,4 +142,6 @@ public interface Driver extends AutoCloseable
140142
* Close all the resources assigned to this driver, including any open connections.
141143
*/
142144
void close();
145+
146+
CompletionStage<Void> closeAsync();
143147
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal;
20+
21+
import org.junit.Test;
22+
23+
import org.neo4j.driver.internal.security.SecurityPlan;
24+
25+
import static java.util.concurrent.CompletableFuture.completedFuture;
26+
import static org.junit.Assert.assertNull;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
30+
import static org.neo4j.driver.internal.async.Futures.getBlocking;
31+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
32+
33+
public class InternalDriverTest
34+
{
35+
@Test
36+
public void shouldCloseSessionFactory()
37+
{
38+
SessionFactory sessionFactory = sessionFactoryMock();
39+
InternalDriver driver = newDriver( sessionFactory );
40+
41+
assertNull( getBlocking( driver.closeAsync() ) );
42+
verify( sessionFactory ).close();
43+
}
44+
45+
@Test
46+
public void shouldNotCloseSessionFactoryMultipleTimes()
47+
{
48+
SessionFactory sessionFactory = sessionFactoryMock();
49+
InternalDriver driver = newDriver( sessionFactory );
50+
51+
assertNull( getBlocking( driver.closeAsync() ) );
52+
assertNull( getBlocking( driver.closeAsync() ) );
53+
assertNull( getBlocking( driver.closeAsync() ) );
54+
55+
verify( sessionFactory ).close();
56+
}
57+
58+
private static InternalDriver newDriver( SessionFactory sessionFactory )
59+
{
60+
return new InternalDriver( SecurityPlan.insecure(), sessionFactory, DEV_NULL_LOGGING );
61+
}
62+
63+
private static SessionFactory sessionFactoryMock()
64+
{
65+
SessionFactory sessionFactory = mock( SessionFactory.class );
66+
when( sessionFactory.close() ).thenReturn( completedFuture( null ) );
67+
return sessionFactory;
68+
}
69+
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5858

5959
import static java.util.Arrays.asList;
60+
import static java.util.concurrent.CompletableFuture.completedFuture;
6061
import static junit.framework.TestCase.fail;
6162
import static org.hamcrest.Matchers.containsString;
6263
import static org.junit.Assert.assertEquals;
@@ -364,6 +365,7 @@ private Driver driverWithPool( ConnectionPool pool )
364365
Logging logging = DEV_NULL_LOGGING;
365366
RoutingSettings settings = new RoutingSettings( 10, 5_000, null );
366367
AsyncConnectionPool asyncConnectionPool = mock( AsyncConnectionPool.class );
368+
when( asyncConnectionPool.close() ).thenReturn( completedFuture( null ) );
367369
LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( pool,
368370
asyncConnectionPool, logging );
369371
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, asyncConnectionPool,

driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void setUp() throws Exception
6363
@After
6464
public void tearDown() throws Exception
6565
{
66-
pool.closeAsync();
66+
pool.close();
6767
}
6868

6969
@Test
@@ -104,7 +104,7 @@ public void shouldFailToAcquireWhenPoolClosed() throws Exception
104104
{
105105
AsyncConnection connection = await( pool.acquire( neo4j.address() ) );
106106
await( connection.forceRelease() );
107-
await( pool.closeAsync() );
107+
await( pool.close() );
108108

109109
try
110110
{
@@ -159,8 +159,8 @@ public void shouldCheckIfPoolHasAddress()
159159
@Test
160160
public void shouldNotCloseWhenClosed()
161161
{
162-
assertNull( await( pool.closeAsync() ) );
163-
assertTrue( pool.closeAsync().toCompletableFuture().isDone() );
162+
assertNull( await( pool.close() ) );
163+
assertTrue( pool.close().toCompletableFuture().isDone() );
164164
}
165165

166166
private AsyncConnectionPoolImpl newPool() throws Exception

0 commit comments

Comments
 (0)