From f3a8f92c3165e19a503a50765870b2ad487546c9 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 3 Oct 2017 14:35:49 +0200 Subject: [PATCH] Expose `Driver#closeAsync()` To allow disposing of the driver and it's resources (including network connections, Netty event loop threads, etc.) in an async manner. --- .../internal/DirectConnectionProvider.java | 15 ++-- .../neo4j/driver/internal/DriverFactory.java | 2 +- .../neo4j/driver/internal/InternalDriver.java | 31 ++++----- .../neo4j/driver/internal/SessionFactory.java | 6 +- .../driver/internal/SessionFactoryImpl.java | 6 +- .../async/pool/AsyncConnectionPool.java | 2 +- .../async/pool/AsyncConnectionPoolImpl.java | 5 +- .../cluster/loadbalancing/LoadBalancer.java | 17 +++-- .../internal/spi/ConnectionProvider.java | 4 +- .../main/java/org/neo4j/driver/v1/Driver.java | 4 ++ .../driver/internal/InternalDriverTest.java | 69 +++++++++++++++++++ .../driver/internal/RoutingDriverTest.java | 2 + .../pool/AsyncConnectionPoolImplTest.java | 8 +-- 13 files changed, 131 insertions(+), 40 deletions(-) create mode 100644 driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java index 639dcd75ba..23475bf69b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java @@ -21,7 +21,6 @@ import java.util.concurrent.CompletionStage; import org.neo4j.driver.internal.async.AsyncConnection; -import org.neo4j.driver.internal.async.Futures; import org.neo4j.driver.internal.async.pool.AsyncConnectionPool; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.ConnectionPool; @@ -61,10 +60,18 @@ public CompletionStage acquireAsyncConnection( AccessMode mode } @Override - public void close() throws Exception + public CompletionStage close() { - pool.close(); - Futures.getBlocking( asyncPool.closeAsync() ); + // todo: remove this try-catch when blocking API works on top of async + try + { + pool.close(); + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } + return asyncPool.close(); } public BoltServerAddress getAddress() diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 55dd3cfe47..19b6ec922b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -92,7 +92,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r try { connectionPool.close(); - Futures.getBlocking( asyncConnectionPool.closeAsync() ); + Futures.getBlocking( asyncConnectionPool.close() ); } catch ( Throwable closeError ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index e3c2ce124d..de80127059 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -18,8 +18,10 @@ */ package org.neo4j.driver.internal; +import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; +import org.neo4j.driver.internal.async.Futures; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; @@ -27,7 +29,7 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; -import static java.lang.String.format; +import static java.util.concurrent.CompletableFuture.completedFuture; public class InternalDriver implements Driver { @@ -95,11 +97,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark ) Session session = sessionFactory.newInstance( mode, bookmark ); if ( closed.get() ) { - // the driver is already closed and we either 1. obtain this session from the old session pool - // or 2. we obtain this session from a new session pool - // For 1. this closeResources will take no effect as everything is already closed. - // For 2. this closeResources will close the new connection pool just created to ensure no resource leak. - closeResources(); + // session does not immediately acquire connection, it is fine to just throw throw driverCloseException(); } return session; @@ -107,11 +105,18 @@ private Session newSession( AccessMode mode, Bookmark bookmark ) @Override public final void close() + { + Futures.getBlocking( closeAsync() ); + } + + @Override + public CompletionStage closeAsync() { if ( closed.compareAndSet( false, true ) ) { - closeResources(); + return sessionFactory.close(); } + return completedFuture( null ); } /** @@ -126,18 +131,6 @@ public final SessionFactory getSessionFactory() return sessionFactory; } - private void closeResources() - { - try - { - sessionFactory.close(); - } - catch ( Exception ex ) - { - log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); - } - } - private void assertOpen() { if ( closed.get() ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java index fefc799536..7233a6d5f1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java @@ -18,10 +18,14 @@ */ package org.neo4j.driver.internal; +import java.util.concurrent.CompletionStage; + import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; -public interface SessionFactory extends AutoCloseable +public interface SessionFactory { Session newInstance( AccessMode mode, Bookmark bookmark ); + + CompletionStage close(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java index a9070c7911..908fa2ab53 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal; +import java.util.concurrent.CompletionStage; + import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.AccessMode; @@ -57,9 +59,9 @@ protected NetworkSession createSession( ConnectionProvider connectionProvider, R } @Override - public final void close() throws Exception + public final CompletionStage close() { - connectionProvider.close(); + return connectionProvider.close(); } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPool.java index 37a137aeb7..22ab4d1a19 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPool.java @@ -33,5 +33,5 @@ public interface AsyncConnectionPool int activeConnections( BoltServerAddress address ); - CompletionStage closeAsync(); + CompletionStage close(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java index 0d76a71fb1..6148dc8a4b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImpl.java @@ -109,7 +109,7 @@ public int activeConnections( BoltServerAddress address ) } @Override - public CompletionStage closeAsync() + public CompletionStage close() { if ( closed.compareAndSet( false, true ) ) { @@ -128,7 +128,8 @@ public CompletionStage closeAsync() eventLoopGroup().shutdownGracefully(); } } - return Futures.asCompletionStage( eventLoopGroup().terminationFuture() ); + return Futures.asCompletionStage( eventLoopGroup().terminationFuture() ) + .thenApply( ignore -> null ); } private ChannelPool getOrCreatePool( BoltServerAddress address ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 4c913298ee..86d948bc74 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -26,7 +26,6 @@ import org.neo4j.driver.internal.RoutingErrorHandler; import org.neo4j.driver.internal.async.AsyncConnection; -import org.neo4j.driver.internal.async.Futures; import org.neo4j.driver.internal.async.RoutingAsyncConnection; import org.neo4j.driver.internal.async.pool.AsyncConnectionPool; import org.neo4j.driver.internal.cluster.AddressSet; @@ -52,7 +51,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; -public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable +public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler { private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer"; @@ -131,10 +130,18 @@ public void onWriteFailure( BoltServerAddress address ) } @Override - public void close() throws Exception + public CompletionStage close() { - connections.close(); - Futures.getBlocking( asyncConnectionPool.closeAsync() ); + try + { + connections.close(); + } + catch ( Exception e ) + { + throw new RuntimeException( e ); + } + + return asyncConnectionPool.close(); } private PooledConnection acquireConnection( AccessMode mode, AddressSet servers ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java index 3847064589..f7be203630 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java @@ -27,7 +27,7 @@ * Interface defines a layer used by the driver to obtain connections. It is meant to be the only component that * differs between "direct" and "routing" driver. */ -public interface ConnectionProvider extends AutoCloseable +public interface ConnectionProvider { /** * Acquire new {@link PooledConnection pooled connection} for the given {@link AccessMode mode}. @@ -38,4 +38,6 @@ public interface ConnectionProvider extends AutoCloseable PooledConnection acquireConnection( AccessMode mode ); CompletionStage acquireAsyncConnection( AccessMode mode ); + + CompletionStage close(); } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Driver.java b/driver/src/main/java/org/neo4j/driver/v1/Driver.java index 679126dd90..2331d7c2f2 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Driver.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Driver.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.v1; +import java.util.concurrent.CompletionStage; + /** * Accessor for a specific Neo4j graph database. *

@@ -140,4 +142,6 @@ public interface Driver extends AutoCloseable * Close all the resources assigned to this driver, including any open connections. */ void close(); + + CompletionStage closeAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java new file mode 100644 index 0000000000..7f82976528 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalDriverTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.Test; + +import org.neo4j.driver.internal.security.SecurityPlan; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.Futures.getBlocking; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; + +public class InternalDriverTest +{ + @Test + public void shouldCloseSessionFactory() + { + SessionFactory sessionFactory = sessionFactoryMock(); + InternalDriver driver = newDriver( sessionFactory ); + + assertNull( getBlocking( driver.closeAsync() ) ); + verify( sessionFactory ).close(); + } + + @Test + public void shouldNotCloseSessionFactoryMultipleTimes() + { + SessionFactory sessionFactory = sessionFactoryMock(); + InternalDriver driver = newDriver( sessionFactory ); + + assertNull( getBlocking( driver.closeAsync() ) ); + assertNull( getBlocking( driver.closeAsync() ) ); + assertNull( getBlocking( driver.closeAsync() ) ); + + verify( sessionFactory ).close(); + } + + private static InternalDriver newDriver( SessionFactory sessionFactory ) + { + return new InternalDriver( SecurityPlan.insecure(), sessionFactory, DEV_NULL_LOGGING ); + } + + private static SessionFactory sessionFactoryMock() + { + SessionFactory sessionFactory = mock( SessionFactory.class ); + when( sessionFactory.close() ).thenReturn( completedFuture( null ) ); + return sessionFactory; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java index 3f2273aab6..94831aa3fb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java @@ -57,6 +57,7 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.util.Arrays.asList; +import static java.util.concurrent.CompletableFuture.completedFuture; import static junit.framework.TestCase.fail; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; @@ -364,6 +365,7 @@ private Driver driverWithPool( ConnectionPool pool ) Logging logging = DEV_NULL_LOGGING; RoutingSettings settings = new RoutingSettings( 10, 5_000, null ); AsyncConnectionPool asyncConnectionPool = mock( AsyncConnectionPool.class ); + when( asyncConnectionPool.close() ).thenReturn( completedFuture( null ) ); LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( pool, asyncConnectionPool, logging ); ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, asyncConnectionPool, diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java index 81431a1281..23dda20570 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/AsyncConnectionPoolImplTest.java @@ -63,7 +63,7 @@ public void setUp() throws Exception @After public void tearDown() throws Exception { - pool.closeAsync(); + pool.close(); } @Test @@ -104,7 +104,7 @@ public void shouldFailToAcquireWhenPoolClosed() throws Exception { AsyncConnection connection = await( pool.acquire( neo4j.address() ) ); await( connection.forceRelease() ); - await( pool.closeAsync() ); + await( pool.close() ); try { @@ -159,8 +159,8 @@ public void shouldCheckIfPoolHasAddress() @Test public void shouldNotCloseWhenClosed() { - assertNull( await( pool.closeAsync() ) ); - assertTrue( pool.closeAsync().toCompletableFuture().isDone() ); + assertNull( await( pool.close() ) ); + assertTrue( pool.close().toCompletableFuture().isDone() ); } private AsyncConnectionPoolImpl newPool() throws Exception