From e2082b523262ea1430c3866586a41ab28cb8306b Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 23 Dec 2016 23:43:25 +0100 Subject: [PATCH 1/6] Configurable test-on-borrow for pooled connections Connections are pooled for both direct and routing driver. Each acquired session contains a socket connection which was acquired from the pool. It is possible for an idle pooled connection to become invalid while just resting in the pool. This could happen when connection is terminated after some timeout by a load balancer or some other network facility. This commit introduces a configurable connection liveness check timeout. So freshly acquired connection, that has been idle in the pool for more than configurable timeout, will be validated by sending a RESET message. If it appears to be broken acquisition will retry until a valid connection is found. --- .../neo4j/driver/internal/DriverFactory.java | 47 +++- .../internal/net/pooling/PoolSettings.java | 25 +- .../net/pooling/PooledConnection.java | 21 +- .../PooledConnectionReleaseConsumer.java | 12 +- .../pooling/PooledConnectionValidator.java | 25 +- .../net/pooling/SocketConnectionPool.java | 103 +++++--- .../internal/spi/ConnectionValidator.java | 26 ++ .../main/java/org/neo4j/driver/v1/Config.java | 60 +++-- .../org/neo4j/driver/internal/ConfigTest.java | 40 ++- .../driver/internal/DriverFactoryTest.java | 20 +- .../net/pooling/PoolSettingsTest.java | 67 +++++ .../net/pooling/PooledConnectionTest.java | 72 ++++-- .../PooledConnectionValidatorTest.java | 110 ++++++++- .../net/pooling/SocketConnectionPoolTest.java | 233 +++++++++++++++++- .../util/ConnectionTrackingConnector.java | 45 ++++ .../util/ConnectionTrackingDriverFactory.java | 57 +++++ .../internal/util/DriverFactoryWithClock.java | 37 +++ .../neo4j/driver/internal/util/FakeClock.java | 13 +- .../v1/integration/CausalClusteringIT.java | 83 +++++++ .../driver/v1/integration/ServerKilledIT.java | 75 ++++-- 20 files changed, 1010 insertions(+), 161 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 58cbeec53e..dbd7e18d73 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -36,6 +36,7 @@ import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.exceptions.ClientException; import static java.lang.String.format; @@ -91,10 +92,10 @@ private Driver createDriver( BoltServerAddress address, String scheme, Connectio /** * Creates new {@link DirectDriver}. *

- * This method is package-private only for testing + * This method is protected only for testing */ - DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) { return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() ); } @@ -102,29 +103,51 @@ DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool conne /** * Creates new {@link RoutingDriver}. *

- * This method is package-private only for testing + * This method is protected only for testing */ - RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory ) { return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory, - Clock.SYSTEM, config.logging() ); + createClock(), config.logging() ); } /** * Creates new {@link ConnectionPool}. *

- * This method is package-private only for testing + * This method is protected only for testing */ - ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config ) { authToken = authToken == null ? AuthTokens.none() : authToken; ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() ); - PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize() ); - Connector connector = new SocketConnector( connectionSettings, securityPlan, config.logging() ); + PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(), + config.idleTimeBeforeConnectionTest() ); + Connector connector = createConnector( connectionSettings, securityPlan, config.logging() ); - return new SocketConnectionPool( poolSettings, connector, Clock.SYSTEM, config.logging() ); + return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() ); + } + + /** + * Creates new {@link Clock}. + *

+ * This method is protected only for testing + */ + protected Clock createClock() + { + return Clock.SYSTEM; + } + + /** + * Creates new {@link Connector}. + *

+ * This method is protected only for testing + */ + protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan, + Logging logging ) + { + return new SocketConnector( connectionSettings, securityPlan, logging ); } private static SessionFactory createSessionFactory( Config config ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java index 36ecda1a3d..a1d738d623 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java @@ -20,20 +20,37 @@ public class PoolSettings { + public static final int NO_IDLE_CONNECTION_TEST = -1; + public static final int DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE = 10; + public static final int DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST = NO_IDLE_CONNECTION_TEST; - /** - * Maximum number of idle connections per pool. - */ private final int maxIdleConnectionPoolSize; + private final long idleTimeBeforeConnectionTest; - public PoolSettings( int maxIdleConnectionPoolSize ) + public PoolSettings( int maxIdleConnectionPoolSize, long idleTimeBeforeConnectionTest ) { this.maxIdleConnectionPoolSize = maxIdleConnectionPoolSize; + this.idleTimeBeforeConnectionTest = idleTimeBeforeConnectionTest; } public int maxIdleConnectionPoolSize() { return maxIdleConnectionPoolSize; } + + public long idleTimeBeforeConnectionTest() + { + if ( !idleTimeBeforeConnectionTestConfigured() ) + { + throw new IllegalStateException( + "Idle time before connection test is not configured: " + idleTimeBeforeConnectionTest ); + } + return idleTimeBeforeConnectionTest; + } + + public boolean idleTimeBeforeConnectionTestConfigured() + { + return idleTimeBeforeConnectionTest > 0; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java index 9bbd58282f..bae48024c2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java @@ -59,19 +59,14 @@ public class PooledConnection implements Connection private boolean unrecoverableErrorsOccurred = false; private Runnable onError = null; private final Clock clock; - private long lastUsed; + private long lastUsedTimestamp; public PooledConnection( Connection delegate, Consumer release, Clock clock ) { this.delegate = delegate; this.release = release; this.clock = clock; - this.lastUsed = clock.millis(); - } - - public void updateTimestamp() - { - lastUsed = clock.millis(); + updateLastUsedTimestamp(); } @Override @@ -192,13 +187,14 @@ public void receiveOne() } } - @Override /** * Make sure only close the connection once on each session to avoid releasing the connection twice, a.k.a. * adding back the connection twice into the pool. */ + @Override public void close() { + updateLastUsedTimestamp(); release.accept( this ); // put the full logic of deciding whether to dispose the connection or to put it back to // the pool into the release object @@ -286,6 +282,11 @@ public void onError( Runnable runnable ) this.onError = runnable; } + public long lastUsedTimestamp() + { + return lastUsedTimestamp; + } + private boolean isProtocolViolationError(RuntimeException e ) { return e instanceof Neo4jException @@ -300,8 +301,8 @@ private boolean isClientOrTransientError( RuntimeException e ) || ((Neo4jException) e).code().contains( "TransientError" )); } - public long idleTime() + private void updateLastUsedTimestamp() { - return clock.millis() - lastUsed; + this.lastUsedTimestamp = clock.millis(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index 93dcaf59e1..618796ca98 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -18,10 +18,8 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.concurrent.atomic.AtomicBoolean; - +import org.neo4j.driver.internal.spi.ConnectionValidator; import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.util.Function; /** * The responsibility of the PooledConnectionReleaseConsumer is to release valid connections @@ -30,19 +28,19 @@ class PooledConnectionReleaseConsumer implements Consumer { private final BlockingPooledConnectionQueue connections; - private final Function validConnection; + private final ConnectionValidator connectionValidator; PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, - Function validConnection) + ConnectionValidator connectionValidator ) { this.connections = connections; - this.validConnection = validConnection; + this.connectionValidator = connectionValidator; } @Override public void accept( PooledConnection pooledConnection ) { - if ( validConnection.apply( pooledConnection ) ) + if ( connectionValidator.isReusable( pooledConnection ) ) { connections.offer( pooledConnection ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java index 107c91a850..5480cc84fb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java @@ -19,9 +19,9 @@ package org.neo4j.driver.internal.net.pooling; import org.neo4j.driver.internal.spi.ConnectionPool; -import org.neo4j.driver.v1.util.Function; +import org.neo4j.driver.internal.spi.ConnectionValidator; -class PooledConnectionValidator implements Function +class PooledConnectionValidator implements ConnectionValidator { private final ConnectionPool pool; @@ -31,28 +31,25 @@ class PooledConnectionValidator implements Function } @Override - public Boolean apply( PooledConnection pooledConnection ) + public boolean isReusable( PooledConnection pooledConnection ) { // once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error // and we should close the conn without bothering to reset the conn at all return pool.hasAddress( pooledConnection.boltServerAddress() ) && !pooledConnection.hasUnrecoverableErrors() && - reset( pooledConnection ); + isConnected( pooledConnection ); } - /** - * In case this session has an open result or transaction or something, - * make sure it's reset to a nice state before we reuse it. - * - * @param conn the PooledConnection - * @return true if the connection is reset successfully without any error, otherwise false. - */ - private static boolean reset( PooledConnection conn ) + @Override + public boolean isConnected( PooledConnection connection ) { try { - conn.reset(); - conn.sync(); + // try to use this connection for RESET message + // in case this session has an open result or transaction or something, + // make sure it's reset to a nice state before we reuse it. + connection.reset(); + connection.sync(); return true; } catch ( Throwable e ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index a8bc442f4e..2e0f5b8489 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -19,11 +19,13 @@ package org.neo4j.driver.internal.net.pooling; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.ConnectionValidator; import org.neo4j.driver.internal.spi.Connector; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; @@ -46,13 +48,13 @@ public class SocketConnectionPool implements ConnectionPool /** * Pools, organized by server address. */ - private final ConcurrentHashMap pools = - new ConcurrentHashMap<>(); + private final ConcurrentMap pools = new ConcurrentHashMap<>(); private final AtomicBoolean closed = new AtomicBoolean(); private final PoolSettings poolSettings; private final Connector connector; + private final ConnectionValidator connectionValidator; private final Clock clock; private final Logging logging; @@ -60,6 +62,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo { this.poolSettings = poolSettings; this.connector = connector; + this.connectionValidator = new PooledConnectionValidator( this ); this.clock = clock; this.logging = logging; } @@ -68,26 +71,41 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo public Connection acquire( final BoltServerAddress address ) { assertNotClosed(); + BlockingPooledConnectionQueue connectionQueue = pool( address ); + PooledConnection connection = acquireConnection( address, connectionQueue ); + assertNotClosed( address, connectionQueue ); - final BlockingPooledConnectionQueue connections = pool( address ); - Supplier supplier = new Supplier() + return connection; + } + + @Override + public void purge( BoltServerAddress address ) + { + BlockingPooledConnectionQueue connections = pools.remove( address ); + if ( connections != null ) { - @Override - public PooledConnection get() + connections.terminate(); + } + } + + @Override + public boolean hasAddress( BoltServerAddress address ) + { + return pools.containsKey( address ); + } + + @Override + public void close() + { + if ( closed.compareAndSet( false, true ) ) + { + for ( BlockingPooledConnectionQueue pool : pools.values() ) { - PooledConnectionValidator connectionValidator = - new PooledConnectionValidator( SocketConnectionPool.this ); - PooledConnectionReleaseConsumer releaseConsumer = - new PooledConnectionReleaseConsumer( connections, connectionValidator ); - return new PooledConnection( connector.connect( address ), releaseConsumer, clock ); + pool.terminate(); } - }; - PooledConnection conn = connections.acquire( supplier ); - - assertNotClosed( address, connections ); - conn.updateTimestamp(); - return conn; + pools.clear(); + } } private BlockingPooledConnectionQueue pool( BoltServerAddress address ) @@ -106,34 +124,57 @@ private BlockingPooledConnectionQueue pool( BoltServerAddress address ) return pool; } - @Override - public void purge( BoltServerAddress address ) + private PooledConnection acquireConnection( final BoltServerAddress address, + final BlockingPooledConnectionQueue connectionQueue ) { - BlockingPooledConnectionQueue connections = pools.remove( address ); - if ( connections != null ) + Supplier supplier = newPooledConnectionSupplier( address, connectionQueue ); + + int acquisitionAttempt = 1; + PooledConnection connection = connectionQueue.acquire( supplier ); + while ( !canBeAcquired( connection, acquisitionAttempt ) ) { - connections.terminate(); + connection = connectionQueue.acquire( supplier ); + acquisitionAttempt++; } + return connection; } - @Override - public boolean hasAddress( BoltServerAddress address ) + private Supplier newPooledConnectionSupplier( final BoltServerAddress address, + final BlockingPooledConnectionQueue connectionQueue ) { - return pools.containsKey( address ); + return new Supplier() + { + @Override + public PooledConnection get() + { + PooledConnectionReleaseConsumer releaseConsumer = + new PooledConnectionReleaseConsumer( connectionQueue, connectionValidator ); + return new PooledConnection( connector.connect( address ), releaseConsumer, clock ); + } + }; } - @Override - public void close() + private boolean canBeAcquired( PooledConnection connection, int acquisitionAttempt ) { - if ( closed.compareAndSet( false, true ) ) + if ( poolSettings.idleTimeBeforeConnectionTestConfigured() ) { - for ( BlockingPooledConnectionQueue pool : pools.values() ) + if ( acquisitionAttempt > poolSettings.maxIdleConnectionPoolSize() ) { - pool.terminate(); + return true; } - pools.clear(); + if ( hasBeenIdleForTooLong( connection ) ) + { + return connectionValidator.isConnected( connection ); + } } + return true; + } + + private boolean hasBeenIdleForTooLong( PooledConnection connection ) + { + long idleTime = clock.millis() - connection.lastUsedTimestamp(); + return idleTime > poolSettings.idleTimeBeforeConnectionTest(); } private void assertNotClosed( BoltServerAddress address, BlockingPooledConnectionQueue connections ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java new file mode 100644 index 0000000000..652ddcbce1 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.spi; + +public interface ConnectionValidator +{ + boolean isReusable( T connection ); + + boolean isConnected( T connection ); +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index 6db54a2287..cbe608f056 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -53,6 +53,12 @@ public class Config private final int maxIdleConnectionPoolSize; + /** + * Connections that have been idle in the pool longer than this threshold will + * be tested for validity before being returned to the user. + */ + private final long idleTimeBeforeConnectionTest; + /** Level of encryption we need to adhere to */ private final EncryptionLevel encryptionLevel; @@ -68,6 +74,7 @@ private Config( ConfigBuilder builder) this.logging = builder.logging; this.logLeakedSessions = builder.logLeakedSessions; + this.idleTimeBeforeConnectionTest = builder.idleTimeBeforeConnectionTest; this.maxIdleConnectionPoolSize = builder.maxIdleConnectionPoolSize; this.encryptionLevel = builder.encryptionLevel; @@ -116,17 +123,14 @@ public int maxIdleConnectionPoolSize() } /** - * Pooled connections that have been unused for longer than this timeout will be tested before they are - * used again, to ensure they are still live. + * Pooled sessions that have been idle in the pool for longer than this timeout + * will be tested before they are used again, to ensure they are still live. * * @return idle time in milliseconds - * @deprecated pooled sessions are automatically checked for validity before being returned to the pool. This - * method will always return -1 and will be possibly removed in future. */ - @Deprecated public long idleTimeBeforeConnectionTest() { - return -1; + return idleTimeBeforeConnectionTest; } /** @@ -183,6 +187,7 @@ public static class ConfigBuilder private Logging logging = new JULogging( Level.INFO ); private boolean logLeakedSessions; private int maxIdleConnectionPoolSize = PoolSettings.DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE; + private long idleTimeBeforeConnectionTest = PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST; private EncryptionLevel encryptionLevel = EncryptionLevel.REQUIRED; private TrustStrategy trustStrategy = trustAllCertificates(); private int routingFailureLimit = 1; @@ -256,30 +261,53 @@ public ConfigBuilder withMaxIdleSessions( int size ) } /** - * Pooled sessions that have been unused for longer than this timeout - * will be tested before they are used again, to ensure they are still live. + * Please use {@link #withSessionLivenessCheckTimeout(long, TimeUnit)}. * + * @param timeout minimum idle time in milliseconds + * @return this builder + * @see #withSessionLivenessCheckTimeout(long, TimeUnit) + * @deprecated please use overloaded method with {@link TimeUnit} parameter. This method will be removed in + * future release. + */ + @Deprecated + public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) + { + withSessionLivenessCheckTimeout( timeout, TimeUnit.MILLISECONDS ); + return this; + } + + /** + * Pooled sessions that have been idle in the pool for longer than this timeout + * will be tested before they are used again, to ensure they are still live. + *

* If this option is set too low, an additional network call will be * incurred when acquiring a session, which causes a performance hit. - * + *

* If this is set high, you may receive sessions that are no longer live, * which will lead to exceptions in your application. Assuming the * database is running, these exceptions will go away if you retry acquiring * sessions. - * + *

* Hence, this parameter tunes a balance between the likelihood of your * application seeing connection problems, and performance. - * + *

* You normally should not need to tune this parameter. + * This feature is turned off by default. * - * @param timeout minimum idle time in milliseconds + * @param value the minimum idle time in milliseconds + * @param unit the unit in which the duration is given * @return this builder - * @deprecated pooled sessions are automatically checked for validity before being returned to the pool. - * This setting will be ignored and possibly removed in future. */ - @Deprecated - public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) + public ConfigBuilder withSessionLivenessCheckTimeout( long value, TimeUnit unit ) { + long idleTimeBeforeConnectionTestMillis = unit.toMillis( value ); + if ( idleTimeBeforeConnectionTestMillis <= 0 ) + { + throw new IllegalArgumentException( String.format( + "The timeout value must be positive when converted to ms, but was %d. Given %d %s", + idleTimeBeforeConnectionTestMillis, value, unit ) ); + } + this.idleTimeBeforeConnectionTest = idleTimeBeforeConnectionTestMillis; return this; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java b/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java index 3f0c3ef293..954e0dd86c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java @@ -83,13 +83,43 @@ public void shouldChangeToTrustedCert() } @Test - public void shouldIgnoreLivenessCheckTimeoutSetting() throws Throwable + public void shouldSupportLivenessCheckTimeoutSetting() throws Throwable { - // when - Config config = Config.build().withSessionLivenessCheckTimeout( 1337 ).toConfig(); + Config config = Config.build().withSessionLivenessCheckTimeout( 42, TimeUnit.SECONDS ).toConfig(); - // then - assertEquals( -1, config.idleTimeBeforeConnectionTest() ); + assertEquals( TimeUnit.SECONDS.toMillis( 42 ), config.idleTimeBeforeConnectionTest() ); + } + + @Test + public void shouldThrowForZeroTimeoutInLivenessCheckTimeoutSetting() throws Throwable + { + Config.ConfigBuilder builder = Config.build(); + + try + { + builder.withSessionLivenessCheckTimeout( 0, TimeUnit.SECONDS ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalArgumentException.class ) ); + } + } + + @Test + public void shouldThrowForNegativeTimeoutInLivenessCheckTimeoutSetting() throws Throwable + { + Config.ConfigBuilder builder = Config.build(); + + try + { + builder.withSessionLivenessCheckTimeout( -42, TimeUnit.SECONDS ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalArgumentException.class ) ); + } } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index 2c6efe7ccd..564d81e340 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -142,21 +142,22 @@ private static class ThrowingDriverFactory extends DriverFactory } @Override - DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) { throw new UnsupportedOperationException( "Can't create direct driver" ); } @Override - RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, + SessionFactory sessionFactory ) { throw new UnsupportedOperationException( "Can't create routing driver" ); } @Override - ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config ) + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config ) { return connectionPool; } @@ -167,16 +168,17 @@ private static class SessionFactoryCapturingDriverFactory extends DriverFactory SessionFactory capturedSessionFactory; @Override - DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) { capturedSessionFactory = sessionFactory; return null; } @Override - RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, + SessionFactory sessionFactory ) { capturedSessionFactory = sessionFactory; return null; diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java new file mode 100644 index 0000000000..700cd6a8d9 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + +import org.junit.Test; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class PoolSettingsTest +{ + @Test + public void idleTimeBeforeConnectionTestWhenConfigured() + { + PoolSettings settings = new PoolSettings( 10, 42 ); + assertTrue( settings.idleTimeBeforeConnectionTestConfigured() ); + assertEquals( 42, settings.idleTimeBeforeConnectionTest() ); + } + + @Test + public void idleTimeBeforeConnectionTestWhenSetToZero() + { + testWithIllegalValue( 0 ); + } + + @Test + public void idleTimeBeforeConnectionTestWhenSetToNegativeValue() + { + testWithIllegalValue( -1 ); + testWithIllegalValue( -42 ); + } + + private static void testWithIllegalValue( int value ) + { + PoolSettings settings = new PoolSettings( 10, value ); + + assertFalse( settings.idleTimeBeforeConnectionTestConfigured() ); + + try + { + settings.idleTimeBeforeConnectionTest(); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalStateException.class ) ); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java index 11525fb32d..9800b25e0c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java @@ -21,14 +21,15 @@ import org.junit.Test; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionValidator; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.util.Function; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -43,24 +44,8 @@ public class PooledConnectionTest { - private static final Function - VALID_CONNECTION = new Function() - { - @Override - public Boolean apply( PooledConnection pooledConnection ) - { - return true; - } - }; - private static final Function - INVALID_CONNECTION = new Function() - { - @Override - public Boolean apply( PooledConnection pooledConnection ) - { - return false; - } - }; + private static final ConnectionValidator VALID_CONNECTION = newFixedValidator( true, true ); + private static final ConnectionValidator INVALID_CONNECTION = newFixedValidator( false, false ); @Test public void shouldDisposeConnectionIfNotValidConnection() throws Throwable @@ -362,8 +347,57 @@ public void shouldThrowExceptionIfFailureReceivedForAckFailure() assertThat( pooledConnection.hasUnrecoverableErrors(), equalTo( true ) ); } + @Test + public void hasNewLastUsedTimestampWhenCreated() + { + PooledConnectionReleaseConsumer releaseConsumer = mock( PooledConnectionReleaseConsumer.class ); + Clock clock = when( mock( Clock.class ).millis() ).thenReturn( 42L ).getMock(); + + PooledConnection connection = new PooledConnection( mock( Connection.class ), releaseConsumer, clock ); + + assertEquals( 42L, connection.lastUsedTimestamp() ); + } + + @Test + public void lastUsedTimestampUpdatedWhenConnectionClosed() + { + PooledConnectionReleaseConsumer releaseConsumer = mock( PooledConnectionReleaseConsumer.class ); + Clock clock = when( mock( Clock.class ).millis() ) + .thenReturn( 42L ).thenReturn( 4242L ).thenReturn( 424242L ).getMock(); + + PooledConnection connection = new PooledConnection( mock( Connection.class ), releaseConsumer, clock ); + + assertEquals( 42, connection.lastUsedTimestamp() ); + + connection.close(); + assertEquals( 4242, connection.lastUsedTimestamp() ); + + connection.close(); + assertEquals( 424242, connection.lastUsedTimestamp() ); + } + private static BlockingPooledConnectionQueue newConnectionQueue( int capacity ) { return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, capacity, mock( Logging.class, RETURNS_MOCKS ) ); } + + private static ConnectionValidator newFixedValidator( final boolean reusable, + final boolean connected ) + { + return new ConnectionValidator() + { + + @Override + public boolean isReusable( PooledConnection connection ) + { + return reusable; + } + + @Override + public boolean isConnected( PooledConnection connection ) + { + return connected; + } + }; + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java index 435cb8d50a..1c336a7eb6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidatorTest.java @@ -23,23 +23,32 @@ import org.mockito.InOrder; import java.io.IOException; +import java.util.Map; import java.util.Queue; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.net.SocketClient; import org.neo4j.driver.internal.net.SocketConnection; +import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.summary.InternalServerInfo; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumers; +import org.neo4j.driver.v1.exceptions.DatabaseException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; @@ -49,13 +58,54 @@ public class PooledConnectionValidatorTest { @Test - public void resetAndSyncValidConnection() + public void isNotReusableWhenPoolHasNoAddress() { Connection connection = mock( Connection.class ); PooledConnection pooledConnection = newPooledConnection( connection ); - PooledConnectionValidator validator = newValidatorWithMockedPool(); - boolean connectionIsValid = validator.apply( pooledConnection ); + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( false ) ); + + assertFalse( validator.isReusable( pooledConnection ) ); + verify( connection, never() ).reset(); + verify( connection, never() ).sync(); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void isNotReusableWhenHasUnrecoverableErrors() + { + Connection connection = mock( Connection.class ); + DatabaseException runError = new DatabaseException( "", "" ); + doThrow( runError ).when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); + + PooledConnection pooledConnection = newPooledConnection( connection ); + + try + { + pooledConnection.run( "BEGIN", null, null ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertSame( runError, e ); + } + assertTrue( pooledConnection.hasUnrecoverableErrors() ); + + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( true ) ); + + assertFalse( validator.isReusable( pooledConnection ) ); + verify( connection, never() ).reset(); + verify( connection, never() ).sync(); + } + + @Test + public void resetAndSyncValidConnectionWhenCheckingIfReusable() + { + Connection connection = mock( Connection.class ); + PooledConnection pooledConnection = newPooledConnection( connection ); + + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( true ) ); + boolean connectionIsValid = validator.isReusable( pooledConnection ); assertTrue( connectionIsValid ); @@ -65,15 +115,15 @@ public void resetAndSyncValidConnection() } @Test - public void sendsSingleResetMessageForValidConnection() throws IOException + public void sendsSingleResetMessageForValidConnectionWhenCheckingIfReusable() throws IOException { SocketClient socket = mock( SocketClient.class ); InternalServerInfo serverInfo = new InternalServerInfo( LOCAL_DEFAULT, "v1" ); Connection connection = new SocketConnection( socket, serverInfo, DEV_NULL_LOGGER ); PooledConnection pooledConnection = newPooledConnection( connection ); - PooledConnectionValidator validator = newValidatorWithMockedPool(); - boolean connectionIsValid = validator.apply( pooledConnection ); + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( true ) ); + boolean connectionIsValid = validator.isReusable( pooledConnection ); assertTrue( connectionIsValid ); @@ -85,20 +135,56 @@ public void sendsSingleResetMessageForValidConnection() throws IOException assertEquals( RESET, messages.peek() ); } - private static PooledConnection newPooledConnection( Connection connection ) + @Test + public void isConnectedReturnsFalseWhenResetFails() { - return new PooledConnection( connection, Consumers.noOp(), Clock.SYSTEM ); + Connection connection = mock( Connection.class ); + doThrow( new RuntimeException() ).when( connection ).reset(); + PooledConnection pooledConnection = newPooledConnection( connection ); + + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( true ) ); + + assertFalse( validator.isConnected( pooledConnection ) ); + verify( connection ).reset(); + verify( connection, never() ).sync(); } - private static PooledConnectionValidator newValidatorWithMockedPool() + @Test + public void isConnectedReturnsFalseWhenSyncFails() { - return new PooledConnectionValidator( connectionPoolMock() ); + Connection connection = mock( Connection.class ); + doThrow( new RuntimeException() ).when( connection ).sync(); + PooledConnection pooledConnection = newPooledConnection( connection ); + + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( true ) ); + + assertFalse( validator.isConnected( pooledConnection ) ); + verify( connection ).reset(); + verify( connection ).sync(); + } + + @Test + public void isConnectedReturnsTrueWhenUnderlyingConnectionWorks() + { + Connection connection = mock( Connection.class ); + PooledConnection pooledConnection = newPooledConnection( connection ); + + PooledConnectionValidator validator = new PooledConnectionValidator( connectionPoolMock( true ) ); + + assertTrue( validator.isConnected( pooledConnection ) ); + verify( connection ).reset(); + verify( connection ).sync(); + } + + private static PooledConnection newPooledConnection( Connection connection ) + { + return new PooledConnection( connection, Consumers.noOp(), Clock.SYSTEM ); } - private static ConnectionPool connectionPoolMock() + private static ConnectionPool connectionPoolMock( boolean knowsAddressed ) { ConnectionPool pool = mock( ConnectionPool.class ); - when( pool.hasAddress( any( BoltServerAddress.class ) ) ).thenReturn( true ); + when( pool.hasAddress( any( BoltServerAddress.class ) ) ).thenReturn( knowsAddressed ); return pool; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index e3e2f5f8fe..8912193233 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.net.pooling; import org.junit.Test; +import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -37,23 +38,31 @@ import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.Connector; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Logging; import static java.util.Collections.newSetFromMap; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_MOCKS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT; import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; -import static org.neo4j.driver.internal.util.Clock.SYSTEM; public class SocketConnectionPoolTest { @@ -274,6 +283,213 @@ public void closeWithConcurrentAcquisitionsEmptiesThePool() throws InterruptedEx } } + @Test + public void recentlyUsedConnectionNotValidatedDuringAcquisition() + { + long idleTimeBeforeConnectionTest = 100; + long creationTimestamp = 42; + long closedAfterMs = 10; + long acquiredAfterMs = 20; + + Connection connection = newConnectionMock( ADDRESS_1 ); + + FakeClock clock = new FakeClock(); + SocketConnectionPool pool = newPool( newMockConnector( connection ), clock, idleTimeBeforeConnectionTest ); + + clock.progress( creationTimestamp ); + Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); + verify( connection, never() ).reset(); + verify( connection, never() ).sync(); + + // return to the pool + clock.progress( closedAfterMs ); + acquiredConnection1.close(); + verify( connection ).reset(); + verify( connection ).sync(); + + clock.progress( acquiredAfterMs ); + Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); + assertSame( acquiredConnection1, acquiredConnection2 ); + + // reset & sync were called only when pooled connection was closed previously + verify( connection ).reset(); + verify( connection ).sync(); + } + + @Test + public void connectionThatWasIdleForALongTimeIsValidatedDuringAcquisition() + { + Connection connection = newConnectionMock( ADDRESS_1 ); + long idleTimeBeforeConnectionTest = 100; + FakeClock clock = new FakeClock(); + + SocketConnectionPool pool = newPool( newMockConnector( connection ), clock, idleTimeBeforeConnectionTest ); + + Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); + verify( connection, never() ).reset(); + verify( connection, never() ).sync(); + + // return to the pool + acquiredConnection1.close(); + verify( connection ).reset(); + verify( connection ).sync(); + + clock.progress( idleTimeBeforeConnectionTest + 42 ); + + Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); + assertSame( acquiredConnection1, acquiredConnection2 ); + + // reset & sync were called only when pooled connection was closed previously + verify( connection, times( 2 ) ).reset(); + verify( connection, times( 2 ) ).sync(); + } + + @Test + public void connectionThatWasIdleForALongTimeIsNotValidatedDuringAcquisitionWhenTimeoutNotConfigured() + { + Connection connection = newConnectionMock( ADDRESS_1 ); + long idleTimeBeforeConnectionTest = PoolSettings.NO_IDLE_CONNECTION_TEST; + FakeClock clock = new FakeClock(); + + SocketConnectionPool pool = newPool( newMockConnector( connection ), clock, idleTimeBeforeConnectionTest ); + + Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); + verify( connection, never() ).reset(); + verify( connection, never() ).sync(); + + // return to the pool + acquiredConnection1.close(); + verify( connection ).reset(); + verify( connection ).sync(); + + clock.progress( 1000 ); + + Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); + assertSame( acquiredConnection1, acquiredConnection2 ); + verify( connection ).reset(); + verify( connection ).sync(); + } + + @Test + public void brokenConnectionsSkippedDuringAcquisition() + { + Connection connection1 = newConnectionMock( ADDRESS_1 ); + Connection connection2 = newConnectionMock( ADDRESS_1 ); + Connection connection3 = newConnectionMock( ADDRESS_1 ); + + doNothing().doThrow( new RuntimeException( "failed to reset" ) ).when( connection1 ).reset(); + doNothing().doThrow( new RuntimeException( "failed to sync" ) ).when( connection2 ).sync(); + + + int idleTimeBeforeConnectionTest = 10; + FakeClock clock = new FakeClock(); + Connector connector = newMockConnector( connection1, connection2, connection3 ); + SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest ); + + Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection3 = pool.acquire( ADDRESS_1 ); + + // return acquired connections to the pool + acquiredConnection1.close(); + acquiredConnection2.close(); + acquiredConnection3.close(); + + clock.progress( idleTimeBeforeConnectionTest + 1 ); + + Connection acquiredConnection = pool.acquire( ADDRESS_1 ); + acquiredConnection.reset(); + acquiredConnection.sync(); + assertSame( acquiredConnection3, acquiredConnection ); + } + + @Test + public void limitedNumberOfBrokenConnectionsIsSkippedDuringAcquisition() + { + Connection connection1 = newConnectionMock( ADDRESS_1 ); + Connection connection2 = newConnectionMock( ADDRESS_1 ); + Connection connection3 = newConnectionMock( ADDRESS_1 ); + Connection connection4 = newConnectionMock( ADDRESS_1 ); + + doNothing().doThrow( new RuntimeException( "failed to reset 1" ) ).when( connection1 ).reset(); + doNothing().doThrow( new RuntimeException( "failed to sync 2" ) ).when( connection2 ).sync(); + doNothing().doThrow( new RuntimeException( "failed to reset 3" ) ).when( connection3 ).reset(); + RuntimeException recentlyUsedConnectionFailure = new RuntimeException( "failed to sync 4" ); + doNothing().doThrow( recentlyUsedConnectionFailure ).when( connection4 ).sync(); + + int idleTimeBeforeConnectionTest = 10; + FakeClock clock = new FakeClock(); + Connector connector = newMockConnector( connection1, connection2, connection3, connection4 ); + SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest ); + + Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection3 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection4 = pool.acquire( ADDRESS_1 ); + + acquiredConnection1.close(); + acquiredConnection2.close(); + acquiredConnection3.close(); + clock.progress( idleTimeBeforeConnectionTest + 1 ); + acquiredConnection4.close(); + + Connection acquiredConnection = pool.acquire( ADDRESS_1 ); + acquiredConnection.reset(); + try + { + acquiredConnection.sync(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertSame( recentlyUsedConnectionFailure, e ); + } + assertSame( acquiredConnection4, acquiredConnection ); + } + + @Test + public void acquireRetriesAtMostMaxPoolSizeTimes() + { + Connection connection1 = newConnectionMock( ADDRESS_1 ); + Connection connection2 = newConnectionMock( ADDRESS_1 ); + Connection connection3 = newConnectionMock( ADDRESS_1 ); + Connection connection4 = newConnectionMock( ADDRESS_1 ); + + doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset(); + doNothing().doThrow( new RuntimeException() ).when( connection2 ).reset(); + doNothing().doThrow( new RuntimeException() ).when( connection3 ).reset(); + + int maxIdleConnectionPoolSize = 3; + int idleTimeBeforeConnectionTest = 10; + FakeClock clock = new FakeClock(); + Connector connector = newMockConnector( connection1, connection2, connection3, connection4 ); + SocketConnectionPool pool = + newPool( connector, clock, maxIdleConnectionPoolSize, idleTimeBeforeConnectionTest ); + + Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); + Connection acquiredConnection3 = pool.acquire( ADDRESS_1 ); + + acquiredConnection1.close(); + acquiredConnection2.close(); + acquiredConnection3.close(); + + // make all connections seem idle for too long + clock.progress( idleTimeBeforeConnectionTest + 10 ); + + Connection acquiredConnection = pool.acquire( ADDRESS_1 ); + assertThat( acquiredConnection, + not( isOneOf( acquiredConnection1, acquiredConnection2, acquiredConnection3 ) ) ); + + // all connections were tested and appeared to be broken + InOrder inOrder = inOrder( connection1, connection2, connection3, connection4 ); + inOrder.verify( connection1 ).reset(); + inOrder.verify( connection2 ).reset(); + inOrder.verify( connection3 ).reset(); + inOrder.verify( connection4, never() ).reset(); + inOrder.verify( connection4, never() ).sync(); + } + private static Answer createConnectionAnswer( final Set createdConnections ) { return new Answer() @@ -319,9 +535,20 @@ private static Connector newMockConnector( Connection connection, Connection... private static SocketConnectionPool newPool( Connector connector ) { - PoolSettings poolSettings = new PoolSettings( 42 ); + return newPool( connector, Clock.SYSTEM, 0 ); + } + + private static SocketConnectionPool newPool( Connector connector, Clock clock, long idleTimeBeforeConnectionTest ) + { + return newPool( connector, clock, 42, idleTimeBeforeConnectionTest ); + } + + private static SocketConnectionPool newPool( Connector connector, Clock clock, int maxIdleConnectionPoolSize, + long idleTimeBeforeConnectionTest ) + { + PoolSettings poolSettings = new PoolSettings( maxIdleConnectionPoolSize, idleTimeBeforeConnectionTest ); Logging logging = mock( Logging.class, RETURNS_MOCKS ); - return new SocketConnectionPool( poolSettings, connector, SYSTEM, logging ); + return new SocketConnectionPool( poolSettings, connector, clock, logging ); } private static Connection newConnectionMock( BoltServerAddress address ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java new file mode 100644 index 0000000000..a5956f3ac1 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import java.util.Set; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.Connector; + +public class ConnectionTrackingConnector implements Connector +{ + private final Connector realConnector; + private final Set connections; + + public ConnectionTrackingConnector( Connector realConnector, Set connections ) + { + this.realConnector = realConnector; + this.connections = connections; + } + + @Override + public Connection connect( BoltServerAddress address ) + { + Connection connection = realConnector.connect( address ); + connections.add( connection ); + return connection; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java new file mode 100644 index 0000000000..568ad42dc8 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.Connector; +import org.neo4j.driver.v1.Logging; + +public class ConnectionTrackingDriverFactory extends DriverFactoryWithClock +{ + private final Set connections = + Collections.newSetFromMap( new ConcurrentHashMap() ); + + public ConnectionTrackingDriverFactory( Clock clock ) + { + super( clock ); + } + + @Override + protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan, + Logging logging ) + { + Connector connector = super.createConnector( connectionSettings, securityPlan, logging ); + return new ConnectionTrackingConnector( connector, connections ); + } + + public void closeConnections() + { + for ( Connection connection : connections ) + { + connection.close(); + } + connections.clear(); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java new file mode 100644 index 0000000000..3446632a69 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import org.neo4j.driver.internal.DriverFactory; + +public class DriverFactoryWithClock extends DriverFactory +{ + private final Clock clock; + + public DriverFactoryWithClock( Clock clock ) + { + this.clock = clock; + } + + @Override + protected Clock createClock() + { + return clock; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FakeClock.java b/driver/src/test/java/org/neo4j/driver/internal/util/FakeClock.java index e46bb38c3c..5d5fae04f7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FakeClock.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FakeClock.java @@ -18,15 +18,15 @@ */ package org.neo4j.driver.internal.util; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.LockSupport; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; - import org.neo4j.driver.internal.EventHandler; import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater; @@ -63,6 +63,11 @@ public void progress( long millis ) private static final AtomicLongFieldUpdater TIMESTAMP = newUpdater( FakeClock.class, "timestamp" ); private PriorityBlockingQueue threads; + public FakeClock() + { + this( (EventHandler) null, false ); + } + public FakeClock( final EventHandler events, boolean progressOnSleep ) { this( events == null ? null : new EventSink() diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index b01fe947ee..db72f4fa39 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -22,10 +22,20 @@ import org.junit.Test; import java.net.URI; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.logging.DevNullLogger; +import org.neo4j.driver.internal.util.ConnectionTrackingDriverFactory; +import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; @@ -44,6 +54,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class CausalClusteringIT @@ -181,6 +192,47 @@ public Void apply( Session session ) } } + @Test + public void shouldDropBrokenOldSessions() throws Exception + { + Cluster cluster = clusterRule.getCluster(); + + int concurrentSessionsCount = 9; + int livenessCheckTimeoutMinutes = 2; + + Config config = Config.build() + .withSessionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) + .withEncryptionLevel( Config.EncryptionLevel.NONE ) + .toConfig(); + + FakeClock clock = new FakeClock(); + ConnectionTrackingDriverFactory driverFactory = new ConnectionTrackingDriverFactory( clock ); + + URI routingUri = cluster.leader().getRoutingUri(); + RoutingSettings routingSettings = new RoutingSettings( 1, TimeUnit.SECONDS.toMillis( 5 ) ); + AuthToken authToken = clusterRule.getDefaultAuthToken(); + + try ( Driver driver = driverFactory.newInstance( routingUri, authToken, routingSettings, config ) ) + { + // create nodes in different threads using different sessions + createNodesInDifferentThreads( concurrentSessionsCount, driver ); + + // now pool contains many sessions, make them all invalid + driverFactory.closeConnections(); + // move clock forward more than configured liveness check timeout + clock.progress( TimeUnit.MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); + + // now all idle connections should be considered too old and will be verified during acquisition + // they will appear broken because they were closed and new valid connection will be created + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + List records = session.run( "MATCH (n) RETURN count(n)" ).list(); + assertEquals( 1, records.size() ); + assertEquals( concurrentSessionsCount, records.get( 0 ).get( 0 ).asInt() ); + } + } + } + private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException { try ( Driver driver = createDriver( member.getRoutingUri() ) ) @@ -265,4 +317,35 @@ public Logger getLog( String name ) return GraphDatabase.driver( boltUri, clusterRule.getDefaultAuthToken(), config ); } + + private static void createNodesInDifferentThreads( int count, final Driver driver ) throws Exception + { + final CountDownLatch beforeRunLatch = new CountDownLatch( count ); + final CountDownLatch runQueryLatch = new CountDownLatch( 1 ); + final ExecutorService executor = Executors.newCachedThreadPool(); + + for ( int i = 0; i < count; i++ ) + { + executor.submit( new Callable() + { + @Override + public Void call() throws Exception + { + beforeRunLatch.countDown(); + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + runQueryLatch.await(); + session.run( "CREATE ()" ); + } + return null; + } + } ); + } + + beforeRunLatch.await(); + runQueryLatch.countDown(); + + executor.shutdown(); + assertTrue( executor.awaitTermination( 1, TimeUnit.MINUTES ) ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java index 677fee52d0..49aa65a66f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java @@ -21,14 +21,22 @@ import org.junit.Rule; import org.junit.Test; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.neo4j.driver.internal.DriverFactory; +import org.neo4j.driver.internal.util.DriverFactoryWithClock; +import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.Neo4jRunner; import org.neo4j.driver.v1.util.TestNeo4j; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** @@ -44,25 +52,19 @@ public class ServerKilledIT public void shouldRecoverFromServerRestart() throws Throwable { // Given - try ( Driver driver = GraphDatabase.driver( Neo4jRunner.DEFAULT_URI, - Config.build().withEncryptionLevel( Config.EncryptionLevel.NONE ).toConfig() ) ) - { - Session s1 = driver.session(); - Session s2 = driver.session(); - Session s3 = driver.session(); - Session s4 = driver.session(); + // config with sessionLivenessCheckTimeout not set, i.e. turned off + Config config = Config.build() + .withEncryptionLevel( Config.EncryptionLevel.NONE ) + .toConfig(); - // And given they are all returned to the connection pool - s1.close(); - s2.close(); - s3.close(); - s4.close(); + try ( Driver driver = GraphDatabase.driver( Neo4jRunner.DEFAULT_URI, config ) ) + { + acquireAndReleaseSessions( 4, driver ); // When neo4j.forceRestart(); // Then we should be able to start using sessions again, at most O(numSessions) session calls later - // TODO: These should value evicted immediately, not show up as application-loggingLevel errors first int toleratedFailures = 4; for ( int i = 0; i < 10; i++ ) { @@ -81,10 +83,53 @@ public void shouldRecoverFromServerRestart() throws Throwable } } - if (toleratedFailures > 0) + if ( toleratedFailures > 0 ) { - fail("This query should have failed " + toleratedFailures + " times"); + fail( "This query should have failed " + toleratedFailures + " times" ); } } } + + @Test + public void shouldDropBrokenOldSessions() throws Throwable + { + // config with set liveness check timeout + int livenessCheckTimeoutMinutes = 10; + Config config = Config.build() + .withSessionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) + .withEncryptionLevel( Config.EncryptionLevel.NONE ) + .toConfig(); + + FakeClock clock = new FakeClock(); + DriverFactory driverFactory = new DriverFactoryWithClock( clock ); + + try ( Driver driver = driverFactory.newInstance( Neo4jRunner.DEFAULT_URI, null, null, config ) ) + { + acquireAndReleaseSessions( 5, driver ); + + // restart database to invalidate all idle connections in the pool + neo4j.forceRestart(); + // move clock forward more than configured liveness check timeout + clock.progress( TimeUnit.MINUTES.toMillis( livenessCheckTimeoutMinutes + 1 ) ); + + // now all idle connections should be considered too old and will be verified during acquisition + // they will appear broken because of the database restart and new valid connection will be created + try ( Session session = driver.session() ) + { + List records = session.run( "RETURN 1" ).list(); + assertEquals( 1, records.size() ); + assertEquals( 1, records.get( 0 ).get( 0 ).asInt() ); + } + } + } + + private static void acquireAndReleaseSessions( int count, Driver driver ) + { + if ( count > 0 ) + { + Session session = driver.session(); + acquireAndReleaseSessions( count - 1, driver ); + session.close(); + } + } } From d840e070f6b0f7e8f6c665705f566dc7423e683e Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 2 Jan 2017 11:55:08 +0100 Subject: [PATCH 2/6] Update license headers to 2017 --- .../java/org/neo4j/driver/internal/spi/ConnectionValidator.java | 2 +- .../org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java | 2 +- .../neo4j/driver/internal/util/ConnectionTrackingConnector.java | 2 +- .../driver/internal/util/ConnectionTrackingDriverFactory.java | 2 +- .../org/neo4j/driver/internal/util/DriverFactoryWithClock.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java index 652ddcbce1..08224448c4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionValidator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002-2016 "Neo Technology," + * Copyright (c) 2002-2017 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] * * This file is part of Neo4j. diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java index 700cd6a8d9..efcf2f51ea 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002-2016 "Neo Technology," + * Copyright (c) 2002-2017 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] * * This file is part of Neo4j. diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java index a5956f3ac1..dc49679fc9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002-2016 "Neo Technology," + * Copyright (c) 2002-2017 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] * * This file is part of Neo4j. diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java index 568ad42dc8..521ea10038 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ConnectionTrackingDriverFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002-2016 "Neo Technology," + * Copyright (c) 2002-2017 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] * * This file is part of Neo4j. diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java index 3446632a69..72b704651f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithClock.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2002-2016 "Neo Technology," + * Copyright (c) 2002-2017 "Neo Technology," * Network Engine for Objects in Lund AB [http://neotechnology.com] * * This file is part of Neo4j. From fb365d8fad28559848b3148e2eb42d35aedbaab2 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 2 Jan 2017 14:53:37 +0100 Subject: [PATCH 3/6] Retry until connection is created `SocketConnectionPool` can test connections during acquisition. This is done when they have been idle in the pool for too long. This commit makes it retry until new connection is created instead of `maxPoolSize`times. --- .../net/pooling/SocketConnectionPool.java | 67 ++++++++++++------- .../net/pooling/SocketConnectionPoolTest.java | 14 +--- 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 2e0f5b8489..f3ee7bb1bd 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -124,41 +124,28 @@ private BlockingPooledConnectionQueue pool( BoltServerAddress address ) return pool; } - private PooledConnection acquireConnection( final BoltServerAddress address, - final BlockingPooledConnectionQueue connectionQueue ) + private PooledConnection acquireConnection( BoltServerAddress address, + BlockingPooledConnectionQueue connectionQueue ) { - Supplier supplier = newPooledConnectionSupplier( address, connectionQueue ); + ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address ); - int acquisitionAttempt = 1; - PooledConnection connection = connectionQueue.acquire( supplier ); - while ( !canBeAcquired( connection, acquisitionAttempt ) ) + PooledConnection connection; + boolean connectionCreated; + do { - connection = connectionQueue.acquire( supplier ); - acquisitionAttempt++; + connection = connectionQueue.acquire( connectionSupplier ); + connectionCreated = connectionSupplier.connectionCreated(); } - return connection; - } + while ( !canBeAcquired( connection, connectionCreated ) ); - private Supplier newPooledConnectionSupplier( final BoltServerAddress address, - final BlockingPooledConnectionQueue connectionQueue ) - { - return new Supplier() - { - @Override - public PooledConnection get() - { - PooledConnectionReleaseConsumer releaseConsumer = - new PooledConnectionReleaseConsumer( connectionQueue, connectionValidator ); - return new PooledConnection( connector.connect( address ), releaseConsumer, clock ); - } - }; + return connection; } - private boolean canBeAcquired( PooledConnection connection, int acquisitionAttempt ) + private boolean canBeAcquired( PooledConnection connection, boolean connectionCreated ) { if ( poolSettings.idleTimeBeforeConnectionTestConfigured() ) { - if ( acquisitionAttempt > poolSettings.maxIdleConnectionPoolSize() ) + if ( connectionCreated ) { return true; } @@ -194,4 +181,34 @@ private void assertNotClosed() throw new IllegalStateException( "Pool closed" ); } } + + private class ConnectionSupplier implements Supplier + { + final BlockingPooledConnectionQueue connectionQueue; + final BoltServerAddress address; + + boolean connectionCreated; + + ConnectionSupplier( BlockingPooledConnectionQueue connectionQueue, BoltServerAddress address ) + { + this.connectionQueue = connectionQueue; + this.address = address; + } + + @Override + public PooledConnection get() + { + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( connectionQueue, + connectionValidator ); + Connection connection = connector.connect( address ); + PooledConnection pooledConnection = new PooledConnection( connection, releaseConsumer, clock ); + connectionCreated = true; + return pooledConnection; + } + + boolean connectionCreated() + { + return connectionCreated; + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index 8912193233..5b737c4f7a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -448,7 +448,7 @@ public void limitedNumberOfBrokenConnectionsIsSkippedDuringAcquisition() } @Test - public void acquireRetriesAtMostMaxPoolSizeTimes() + public void acquireRetriesUntilAConnectionIsCreated() { Connection connection1 = newConnectionMock( ADDRESS_1 ); Connection connection2 = newConnectionMock( ADDRESS_1 ); @@ -459,12 +459,10 @@ public void acquireRetriesAtMostMaxPoolSizeTimes() doNothing().doThrow( new RuntimeException() ).when( connection2 ).reset(); doNothing().doThrow( new RuntimeException() ).when( connection3 ).reset(); - int maxIdleConnectionPoolSize = 3; int idleTimeBeforeConnectionTest = 10; FakeClock clock = new FakeClock(); Connector connector = newMockConnector( connection1, connection2, connection3, connection4 ); - SocketConnectionPool pool = - newPool( connector, clock, maxIdleConnectionPoolSize, idleTimeBeforeConnectionTest ); + SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest ); Connection acquiredConnection1 = pool.acquire( ADDRESS_1 ); Connection acquiredConnection2 = pool.acquire( ADDRESS_1 ); @@ -540,13 +538,7 @@ private static SocketConnectionPool newPool( Connector connector ) private static SocketConnectionPool newPool( Connector connector, Clock clock, long idleTimeBeforeConnectionTest ) { - return newPool( connector, clock, 42, idleTimeBeforeConnectionTest ); - } - - private static SocketConnectionPool newPool( Connector connector, Clock clock, int maxIdleConnectionPoolSize, - long idleTimeBeforeConnectionTest ) - { - PoolSettings poolSettings = new PoolSettings( maxIdleConnectionPoolSize, idleTimeBeforeConnectionTest ); + PoolSettings poolSettings = new PoolSettings( 42, idleTimeBeforeConnectionTest ); Logging logging = mock( Logging.class, RETURNS_MOCKS ); return new SocketConnectionPool( poolSettings, connector, clock, logging ); } From 5cd8592bc5926b883e05f757dca668cb6c7d4cc1 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 2 Jan 2017 14:57:43 +0100 Subject: [PATCH 4/6] Better naming of a config method Renamed `withSessionLivenessCheckTimeout` to `withConnectionLivenessCheckTimeout` because all other methods refer to connections instead of sessions and are actually settings for the connection pool and not session pool. --- driver/src/main/java/org/neo4j/driver/v1/Config.java | 12 ++++++------ .../java/org/neo4j/driver/internal/ConfigTest.java | 6 +++--- .../driver/v1/integration/CausalClusteringIT.java | 2 +- .../neo4j/driver/v1/integration/ServerKilledIT.java | 2 +- .../java/org/neo4j/driver/v1/util/cc/Cluster.java | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index cbe608f056..37ba715827 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -261,18 +261,18 @@ public ConfigBuilder withMaxIdleSessions( int size ) } /** - * Please use {@link #withSessionLivenessCheckTimeout(long, TimeUnit)}. + * Please use {@link #withConnectionLivenessCheckTimeout(long, TimeUnit)}. * * @param timeout minimum idle time in milliseconds * @return this builder - * @see #withSessionLivenessCheckTimeout(long, TimeUnit) - * @deprecated please use overloaded method with {@link TimeUnit} parameter. This method will be removed in - * future release. + * @see #withConnectionLivenessCheckTimeout(long, TimeUnit) + * @deprecated please use {@link #withConnectionLivenessCheckTimeout(long, TimeUnit)} method. This method + * will be removed in future release. */ @Deprecated public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) { - withSessionLivenessCheckTimeout( timeout, TimeUnit.MILLISECONDS ); + withConnectionLivenessCheckTimeout( timeout, TimeUnit.MILLISECONDS ); return this; } @@ -298,7 +298,7 @@ public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) * @param unit the unit in which the duration is given * @return this builder */ - public ConfigBuilder withSessionLivenessCheckTimeout( long value, TimeUnit unit ) + public ConfigBuilder withConnectionLivenessCheckTimeout( long value, TimeUnit unit ) { long idleTimeBeforeConnectionTestMillis = unit.toMillis( value ); if ( idleTimeBeforeConnectionTestMillis <= 0 ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java b/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java index 954e0dd86c..068f96bfc4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java @@ -85,7 +85,7 @@ public void shouldChangeToTrustedCert() @Test public void shouldSupportLivenessCheckTimeoutSetting() throws Throwable { - Config config = Config.build().withSessionLivenessCheckTimeout( 42, TimeUnit.SECONDS ).toConfig(); + Config config = Config.build().withConnectionLivenessCheckTimeout( 42, TimeUnit.SECONDS ).toConfig(); assertEquals( TimeUnit.SECONDS.toMillis( 42 ), config.idleTimeBeforeConnectionTest() ); } @@ -97,7 +97,7 @@ public void shouldThrowForZeroTimeoutInLivenessCheckTimeoutSetting() throws Thro try { - builder.withSessionLivenessCheckTimeout( 0, TimeUnit.SECONDS ); + builder.withConnectionLivenessCheckTimeout( 0, TimeUnit.SECONDS ); fail( "Exception expected" ); } catch ( Exception e ) @@ -113,7 +113,7 @@ public void shouldThrowForNegativeTimeoutInLivenessCheckTimeoutSetting() throws try { - builder.withSessionLivenessCheckTimeout( -42, TimeUnit.SECONDS ); + builder.withConnectionLivenessCheckTimeout( -42, TimeUnit.SECONDS ); fail( "Exception expected" ); } catch ( Exception e ) diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index db72f4fa39..2f8985e15c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -201,7 +201,7 @@ public void shouldDropBrokenOldSessions() throws Exception int livenessCheckTimeoutMinutes = 2; Config config = Config.build() - .withSessionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) + .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) .withEncryptionLevel( Config.EncryptionLevel.NONE ) .toConfig(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java index 49aa65a66f..26ad9af44c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java @@ -96,7 +96,7 @@ public void shouldDropBrokenOldSessions() throws Throwable // config with set liveness check timeout int livenessCheckTimeoutMinutes = 10; Config config = Config.build() - .withSessionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) + .withConnectionLivenessCheckTimeout( livenessCheckTimeoutMinutes, TimeUnit.MINUTES ) .withEncryptionLevel( Config.EncryptionLevel.NONE ) .toConfig(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java index 6cd14cbcf2..568adfc9c8 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java @@ -289,7 +289,7 @@ private static Config driverConfig() .withTrustStrategy( trustAllCertificates() ) .withEncryptionLevel( Config.EncryptionLevel.NONE ) .withMaxIdleSessions( 1 ) - .withSessionLivenessCheckTimeout( TimeUnit.HOURS.toMillis( 1 ) ) + .withConnectionLivenessCheckTimeout( 1, TimeUnit.HOURS ) .toConfig(); } From e00cd6a58ed437abf6279a0b4e61d99f25ff9c4d Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 2 Jan 2017 15:06:16 +0100 Subject: [PATCH 5/6] Allow zero and negative values for `ConnectionLivenessCheckTimeout` So feature can be easier turned on and off by users. Zero timeout means always do test-on-borrow, negative timeout means never to test-on-borrow. --- .../internal/net/pooling/PoolSettings.java | 2 +- .../main/java/org/neo4j/driver/v1/Config.java | 12 ++------ .../org/neo4j/driver/internal/ConfigTest.java | 28 ++++--------------- .../net/pooling/PoolSettingsTest.java | 5 +++- 4 files changed, 14 insertions(+), 33 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java index a1d738d623..de2801c45d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PoolSettings.java @@ -51,6 +51,6 @@ public long idleTimeBeforeConnectionTest() public boolean idleTimeBeforeConnectionTestConfigured() { - return idleTimeBeforeConnectionTest > 0; + return idleTimeBeforeConnectionTest >= 0; } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index 37ba715827..d8481af445 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -292,7 +292,8 @@ public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) * application seeing connection problems, and performance. *

* You normally should not need to tune this parameter. - * This feature is turned off by default. + * This feature is turned off by default. Value {@code 0} means connections will always be tested for + * validity and negative values mean connections will never be tested. * * @param value the minimum idle time in milliseconds * @param unit the unit in which the duration is given @@ -300,14 +301,7 @@ public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) */ public ConfigBuilder withConnectionLivenessCheckTimeout( long value, TimeUnit unit ) { - long idleTimeBeforeConnectionTestMillis = unit.toMillis( value ); - if ( idleTimeBeforeConnectionTestMillis <= 0 ) - { - throw new IllegalArgumentException( String.format( - "The timeout value must be positive when converted to ms, but was %d. Given %d %s", - idleTimeBeforeConnectionTestMillis, value, unit ) ); - } - this.idleTimeBeforeConnectionTest = idleTimeBeforeConnectionTestMillis; + this.idleTimeBeforeConnectionTest = unit.toMillis( value ); return this; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java b/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java index 068f96bfc4..bb093407ae 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ConfigTest.java @@ -91,35 +91,19 @@ public void shouldSupportLivenessCheckTimeoutSetting() throws Throwable } @Test - public void shouldThrowForZeroTimeoutInLivenessCheckTimeoutSetting() throws Throwable + public void shouldAllowZeroConnectionLivenessCheckTimeout() throws Throwable { - Config.ConfigBuilder builder = Config.build(); + Config config = Config.build().withConnectionLivenessCheckTimeout( 0, TimeUnit.SECONDS ).toConfig(); - try - { - builder.withConnectionLivenessCheckTimeout( 0, TimeUnit.SECONDS ); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( IllegalArgumentException.class ) ); - } + assertEquals( 0, config.idleTimeBeforeConnectionTest() ); } @Test - public void shouldThrowForNegativeTimeoutInLivenessCheckTimeoutSetting() throws Throwable + public void shouldAllowNegativeConnectionLivenessCheckTimeout() throws Throwable { - Config.ConfigBuilder builder = Config.build(); + Config config = Config.build().withConnectionLivenessCheckTimeout( -42, TimeUnit.SECONDS ).toConfig(); - try - { - builder.withConnectionLivenessCheckTimeout( -42, TimeUnit.SECONDS ); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( IllegalArgumentException.class ) ); - } + assertEquals( TimeUnit.SECONDS.toMillis( -42 ), config.idleTimeBeforeConnectionTest() ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java index efcf2f51ea..f0aa15313d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PoolSettingsTest.java @@ -39,7 +39,9 @@ public void idleTimeBeforeConnectionTestWhenConfigured() @Test public void idleTimeBeforeConnectionTestWhenSetToZero() { - testWithIllegalValue( 0 ); + PoolSettings settings = new PoolSettings( 10, 0 ); + assertTrue( settings.idleTimeBeforeConnectionTestConfigured() ); + assertEquals( 0, settings.idleTimeBeforeConnectionTest() ); } @Test @@ -47,6 +49,7 @@ public void idleTimeBeforeConnectionTestWhenSetToNegativeValue() { testWithIllegalValue( -1 ); testWithIllegalValue( -42 ); + testWithIllegalValue( Integer.MIN_VALUE ); } private static void testWithIllegalValue( int value ) From d916de5d3856b470d1f58c91b3d8b18cd93283e6 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 2 Jan 2017 16:29:59 +0100 Subject: [PATCH 6/6] Fixed javadoc for connection liveness check timeout Changed "session" to "connection" because config should only contain settings for the connection pool. There is no such thing as session pool. --- driver/src/main/java/org/neo4j/driver/v1/Config.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index d8481af445..3be7f76072 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -123,7 +123,7 @@ public int maxIdleConnectionPoolSize() } /** - * Pooled sessions that have been idle in the pool for longer than this timeout + * Pooled connections that have been idle in the pool for longer than this timeout * will be tested before they are used again, to ensure they are still live. * * @return idle time in milliseconds @@ -277,16 +277,15 @@ public ConfigBuilder withSessionLivenessCheckTimeout( long timeout ) } /** - * Pooled sessions that have been idle in the pool for longer than this timeout + * Pooled connections that have been idle in the pool for longer than this timeout * will be tested before they are used again, to ensure they are still live. *

* If this option is set too low, an additional network call will be - * incurred when acquiring a session, which causes a performance hit. + * incurred when acquiring a connection, which causes a performance hit. *

- * If this is set high, you may receive sessions that are no longer live, + * If this is set high, you may receive sessions that are backed by no longer live connections, * which will lead to exceptions in your application. Assuming the - * database is running, these exceptions will go away if you retry acquiring - * sessions. + * database is running, these exceptions will go away if you retry acquiring sessions. *

* Hence, this parameter tunes a balance between the likelihood of your * application seeing connection problems, and performance.