diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java index b1e0fd9122..0b576cdd95 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java @@ -24,9 +24,11 @@ import io.netty.channel.pool.ChannelPool; import io.netty.util.concurrent.Future; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.async.BoltServerAddress; @@ -38,6 +40,7 @@ import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.exceptions.ClientException; public class ConnectionPoolImpl implements ConnectionPool { @@ -73,8 +76,9 @@ public CompletionStage acquire( final BoltServerAddress address ) ChannelPool pool = getOrCreatePool( address ); Future connectionFuture = pool.acquire(); - return Futures.asCompletionStage( connectionFuture ).thenApply( channel -> + return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) -> { + processAcquisitionError( error ); assertNotClosed( address, channel, pool ); return new NettyConnection( channel, pool, clock ); } ); @@ -160,6 +164,27 @@ private EventLoopGroup eventLoopGroup() return bootstrap.config().group(); } + private void processAcquisitionError( Throwable error ) + { + Throwable cause = Futures.completionErrorCause( error ); + if ( cause != null ) + { + if ( cause instanceof TimeoutException ) + { + // NettyChannelPool returns future failed with TimeoutException if acquire operation takes more than + // configured time, translate this exception to a prettier one and re-throw + throw new ClientException( + "Unable to acquire connection from the pool within configured maximum time of " + + settings.connectionAcquisitionTimeout() + "ms" ); + } + else + { + // some unknown error happened during connection acquisition, propagate it + throw new CompletionException( cause ); + } + } + } + private void assertNotClosed() { if ( closed.get() ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java index 4b69163d3a..455cfb946f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -29,10 +29,10 @@ import org.neo4j.driver.internal.InternalDriver; import org.neo4j.driver.internal.SessionFactory; import org.neo4j.driver.internal.SessionFactoryImpl; +import org.neo4j.driver.internal.async.BoltServerAddress; import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer; -import org.neo4j.driver.internal.async.BoltServerAddress; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.exceptions.ClientException; @@ -236,6 +236,30 @@ public void describeTo( Description description ) }; } + public static Matcher connectionAcquisitionTimeoutError( int timeoutMillis ) + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( Throwable error ) + { + if ( error instanceof ClientException ) + { + String expectedMessage = "Unable to acquire connection from the pool within " + + "configured maximum time of " + timeoutMillis + "ms"; + return expectedMessage.equals( error.getMessage() ); + } + return false; + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "acquisition timeout error with " + timeoutMillis + "ms" ); + } + }; + } + private static boolean contains( AddressSet set, BoltServerAddress address ) { BoltServerAddress[] addresses = set.toArray(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index 7ef5f20b48..c4e1b470c4 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.v1.integration; +import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; @@ -35,7 +36,6 @@ import java.util.concurrent.TimeoutException; import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; import org.neo4j.driver.internal.util.FakeClock; @@ -44,8 +44,6 @@ import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; @@ -64,9 +62,11 @@ import org.neo4j.driver.v1.util.cc.ClusterMemberRole; import org.neo4j.driver.v1.util.cc.ClusterRule; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -76,6 +76,7 @@ import static org.junit.Assert.fail; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Futures.getBlocking; +import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; import static org.neo4j.driver.v1.Values.parameters; public class CausalClusteringIT @@ -85,6 +86,12 @@ public class CausalClusteringIT @Rule public final ClusterRule clusterRule = new ClusterRule(); + @AfterClass + public static void stopSharedCluster() + { + ClusterRule.stopSharedCluster(); + } + @Test public void shouldExecuteReadAndWritesWhenDriverSuppliedWithAddressOfLeader() throws Exception { @@ -532,6 +539,46 @@ public void shouldNotReuseReadConnectionForWriteTransaction() } } + @Test + public void shouldRespectMaxConnectionPoolSizePerClusterMember() + { + Cluster cluster = clusterRule.getCluster(); + ClusterMember leader = cluster.leader(); + + Config config = Config.build() + .withMaxConnectionPoolSize( 2 ) + .withConnectionAcquisitionTimeout( 42, MILLISECONDS ) + .withLogging( DEV_NULL_LOGGING ) + .toConfig(); + + try ( Driver driver = createDriver( leader.getRoutingUri(), config ) ) + { + Session writeSession1 = driver.session( AccessMode.WRITE ); + writeSession1.beginTransaction(); + + Session writeSession2 = driver.session( AccessMode.WRITE ); + writeSession2.beginTransaction(); + + // should not be possible to acquire more connections towards leader because limit is 2 + Session writeSession3 = driver.session( AccessMode.WRITE ); + try + { + writeSession3.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e, is( connectionAcquisitionTimeoutError( 42 ) ) ); + } + + // should be possible to acquire new connection towards read server + // it's a different machine, not leader, so different max connection pool size limit applies + Session readSession = driver.session( AccessMode.READ ); + Record record = readSession.readTransaction( tx -> tx.run( "RETURN 1" ).single() ); + assertEquals( 1, record.get( 0 ).asInt() ); + } + } + private CompletionStage> combineCursors( StatementResultCursor cursor1, StatementResultCursor cursor2 ) { @@ -702,19 +749,15 @@ else if ( role == ClusterMemberRole.READ_REPLICA ) private Driver createDriver( URI boltUri ) { - Logging devNullLogging = new Logging() - { - @Override - public Logger getLog( String name ) - { - return DevNullLogger.DEV_NULL_LOGGER; - } - }; - Config config = Config.build() - .withLogging( devNullLogging ) + .withLogging( DEV_NULL_LOGGING ) .toConfig(); + return createDriver( boltUri, config ); + } + + private Driver createDriver( URI boltUri, Config config ) + { return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionPoolIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionPoolIT.java index df1e688607..57b42b5672 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionPoolIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ConnectionPoolIT.java @@ -30,6 +30,7 @@ import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; +import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; @@ -37,14 +38,18 @@ import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.util.TestNeo4j; import static java.util.concurrent.TimeUnit.SECONDS; -import static junit.framework.TestCase.fail; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT; +import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; public class ConnectionPoolIT { @@ -113,6 +118,28 @@ public void shouldDisposeChannelsBasedOnMaxLifetime() throws Exception assertTrue( channel2.isActive() ); } + @Test + public void shouldRespectMaxConnectionPoolSize() + { + int maxPoolSize = 3; + Config config = Config.build() + .withMaxConnectionPoolSize( maxPoolSize ) + .withConnectionAcquisitionTimeout( 542, TimeUnit.MILLISECONDS ) + .toConfig(); + + driver = new DriverFactoryWithOneEventLoopThread().newInstance( neo4j.uri(), neo4j.authToken(), config ); + + try + { + startAndCloseTransactions( driver, maxPoolSize + 1 ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e, is( connectionAcquisitionTimeoutError( 542 ) ) ); + } + } + @After public void cleanup() throws Exception { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index d585a5a9db..1c460a5be1 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -45,6 +45,7 @@ import org.neo4j.driver.internal.logging.DevNullLogging; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; +import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AuthToken; @@ -73,6 +74,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -87,6 +89,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.internal.util.Matchers.connectionAcquisitionTimeoutError; import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0; import static org.neo4j.driver.v1.Values.parameters; import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; @@ -1296,6 +1299,38 @@ public void shouldConsumePreviousResultBeforeRunningNewQuery() } } + @Test + public void shouldNotRetryOnConnectionAcquisitionTimeout() + { + int maxPoolSize = 3; + Config config = Config.build() + .withMaxConnectionPoolSize( maxPoolSize ) + .withConnectionAcquisitionTimeout( 0, TimeUnit.SECONDS ) + .withMaxTransactionRetryTime( 42, TimeUnit.DAYS ) // retry for a really long time + .toConfig(); + + driver = new DriverFactoryWithOneEventLoopThread().newInstance( neo4j.uri(), neo4j.authToken(), config ); + + for ( int i = 0; i < maxPoolSize; i++ ) + { + driver.session().beginTransaction(); + } + + AtomicInteger invocations = new AtomicInteger(); + try + { + driver.session().writeTransaction( tx -> invocations.incrementAndGet() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e, is( connectionAcquisitionTimeoutError( 0 ) ) ); + } + + // work should never be invoked + assertEquals( 0, invocations.get() ); + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/CausalClusteringStressIT.java b/driver/src/test/java/org/neo4j/driver/v1/stress/CausalClusteringStressIT.java index f4e32a11d7..e819a38dc2 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/stress/CausalClusteringStressIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/CausalClusteringStressIT.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.v1.stress; +import org.junit.AfterClass; import org.junit.Rule; import java.net.URI; @@ -40,6 +41,7 @@ import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.util.cc.ClusterMemberRole; +import org.neo4j.driver.v1.util.cc.ClusterRule; import org.neo4j.driver.v1.util.cc.LocalOrRemoteClusterRule; import static org.hamcrest.Matchers.both; @@ -54,6 +56,12 @@ public class CausalClusteringStressIT extends AbstractStressTestBase