diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java index dbe81ef6e2..6584c7b329 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalSession.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.Logger; @@ -52,7 +53,7 @@ public void run() }; private InternalTransaction currentTransaction; - private boolean isOpen = true; + private AtomicBoolean isOpen = new AtomicBoolean( true ); public InternalSession( Connection connection, Logger logger ) { @@ -100,19 +101,19 @@ public StatementResult run( Statement statement ) @Override public boolean isOpen() { - return isOpen; + return isOpen.get(); } @Override public void close() { - if( !isOpen ) + // Use atomic operation to protect from closing the connection twice (putting back to the pool twice). + if( !isOpen.compareAndSet( true, false ) ) { throw new ClientException( "This session has already been closed." ); } else { - isOpen = false; if ( currentTransaction != null ) { try @@ -124,8 +125,14 @@ public void close() // Best-effort } } - connection.sync(); - connection.close(); + try + { + connection.sync(); + } + finally + { + connection.close(); + } } } @@ -171,7 +178,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction() @Override protected void finalize() throws Throwable { - if( isOpen ) + if( isOpen.compareAndSet( true, false ) ) { logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " + "method on Sessions before disposing of the objects.", null ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java index 6f0e200f81..5b16549c26 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java @@ -157,15 +157,16 @@ public void receiveOne() @Override public void close() - {try { - markAsInUse(); - delegate.close(); - } - finally - { - markAsAvailable(); - } + try + { + markAsInUse(); + delegate.close(); + } + finally + { + markAsAvailable(); + } } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/Allocator.java b/driver/src/main/java/org/neo4j/driver/internal/pool/Allocator.java deleted file mode 100644 index 08a1b735c7..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/Allocator.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.pool; - -import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.exceptions.Neo4jException; - -public interface Allocator -{ - /** - * Called when the pool needs a new value created. The 'release' handle given here will return the object to the - * pool. How it gets invoked is up to the pooled object, but a suggested pattern is for the pooled object to - * implement a 'close' method which calls the release handle. - * - * It is legal for the allocator to fail to allocate a new item. To signal that allocation failed, the allocator - * should throw a {@link Neo4jException} - */ - Value allocate( Consumer release ) throws Neo4jException; - - /** Called when a value gets kicked out of the pool. */ - void onDispose( Value value ); - - /** Called when a value gets acquired from the pool */ - void onAcquire( Value value ); -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java index 9eb9268dab..d7744bdacc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java @@ -24,15 +24,16 @@ import java.util.LinkedList; import java.util.List; import java.util.ServiceLoader; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.connector.socket.SocketConnector; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.Connector; import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.exceptions.ClientException; @@ -41,16 +42,16 @@ import static java.lang.String.format; /** - * A basic connection pool that optimizes for threads being long-lived, acquiring/releasing many connections. - * It uses a global queue as a fallback pool, but tries to avoid coordination by storing connections in a ThreadLocal. + * The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first + * try to return the session into the session pool, however if we failed to return it back, either because the pool + * is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the + * session. * - * Safety is achieved by tracking thread locals getting garbage collected, returning connections to the global pool - * when this happens. + * The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to + * pool) when the work with the session has been done. * - * If threads are long-lived, this pool will achieve linearly scalable performance with overhead equivalent to a - * hash-map lookup per acquire. - * - * If threads are short-lived, this pool is not ideal. + * The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool + * at the same time. */ public class InternalConnectionPool implements ConnectionPool { @@ -62,36 +63,26 @@ public class InternalConnectionPool implements ConnectionPool /** * Pools, organized by URL. */ - private final ConcurrentHashMap> pools = new ConcurrentHashMap<>(); - - /** - * Connections that fail this criteria will be disposed of. - */ - private final ValidationStrategy connectionValidation; + private final ConcurrentHashMap> pools = new ConcurrentHashMap<>(); private final AuthToken authToken; - /** - * Timeout in milliseconds if there are no available sessions. - */ - private final long acquireSessionTimeout; - private final Clock clock; private final Config config; + /** Shutdown flag */ + private final AtomicBoolean stopped = new AtomicBoolean( false ); + public InternalConnectionPool( Config config, AuthToken authToken ) { - this( loadConnectors(), Clock.SYSTEM, config, authToken, - Long.getLong( "neo4j.driver.acquireSessionTimeout", 30_000 ) ); + this( loadConnectors(), Clock.SYSTEM, config, authToken); } public InternalConnectionPool( Collection conns, Clock clock, Config config, - AuthToken authToken, long acquireTimeout ) + AuthToken authToken ) { this.authToken = authToken; - this.acquireSessionTimeout = acquireTimeout; this.config = config; this.clock = clock; - this.connectionValidation = new PooledConnectionValidator( config.idleTimeBeforeConnectionTest() ); for ( Connector connector : conns ) { for ( String s : connector.supportedSchemes() ) @@ -104,37 +95,37 @@ public InternalConnectionPool( Collection conns, Clock clock, Config @Override public Connection acquire( URI sessionURI ) { - try + if ( stopped.get() ) { - Connection conn = pool( sessionURI ).acquire( acquireSessionTimeout, TimeUnit.MILLISECONDS ); - if ( conn == null ) + throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); + } + BlockingQueue connections = pool( sessionURI ); + PooledConnection conn = connections.poll(); + if ( conn == null ) + { + Connector connector = connectors.get( sessionURI.getScheme() ); + if ( connector == null ) { throw new ClientException( - "Failed to acquire a session with Neo4j " + - "as all the connections in the connection pool are already occupied by other sessions. " + - "Please close unused session and retry. " + - "Current Pool size: " + config.connectionPoolSize() + - ". If your application requires running more sessions concurrently than the current pool " + - "size, you should create a driver with a larger connection pool size." ); + format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.", + sessionURI.getScheme(), sessionURI, connectorSchemes() ) ); } - return conn; - } - catch ( InterruptedException e ) - { - throw new ClientException( "Interrupted while waiting for a connection to Neo4j." ); + conn = new PooledConnection(connector.connect( sessionURI, config, authToken ), new + PooledConnectionReleaseConsumer( connections, stopped, config ), clock); } + conn.updateUsageTimestamp(); + return conn; } - private ThreadCachingPool pool( URI sessionURI ) + private BlockingQueue pool( URI sessionURI ) { - ThreadCachingPool pool = pools.get( sessionURI ); + BlockingQueue pool = pools.get( sessionURI ); if ( pool == null ) { - pool = newPool( sessionURI ); + pool = new LinkedBlockingQueue<>(config.maxIdleConnectionPoolSize()); if ( pools.putIfAbsent( sessionURI, pool ) != null ) { // We lost a race to create the pool, dispose of the one we created, and recurse - pool.close(); return pool( sessionURI ); } } @@ -161,48 +152,30 @@ private static Collection loadConnectors() @Override public void close() throws Neo4jException { - for ( ThreadCachingPool pool : pools.values() ) + if( !stopped.compareAndSet( false, true ) ) { - pool.close(); + // already closed or some other thread already started close + return; } - pools.clear(); - } - - private String connectorSchemes() - { - return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) ); - } - - private ThreadCachingPool newPool( final URI uri ) - { - return new ThreadCachingPool<>( config.connectionPoolSize(), new Allocator() + for ( BlockingQueue pool : pools.values() ) { - @Override - public PooledConnection allocate( Consumer release ) + while ( !pool.isEmpty() ) { - Connector connector = connectors.get( uri.getScheme() ); - if ( connector == null ) + PooledConnection conn = pool.poll(); + if ( conn != null ) { - throw new ClientException( - format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.", - uri.getScheme(), uri, connectorSchemes() ) ); + //close the underlying connection without adding it back to the queue + conn.dispose(); } - Connection conn = connector.connect( uri, config, authToken ); - return new PooledConnection( conn, release ); - } - - @Override - public void onDispose( PooledConnection pooledConnection ) - { - pooledConnection.dispose(); } + } - @Override - public void onAcquire( PooledConnection pooledConnection ) - { + pools.clear(); + } - } - }, connectionValidation, clock ); + private String connectorSchemes() + { + return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java index 8d89f46f8c..bcfcd5745a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java @@ -22,24 +22,52 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.Neo4jException; - +/** + * The state of a pooledConnection from a pool point of view could be one of the following: + * Created, + * Available, + * Claimed, + * Closed, + * Disposed. + * + * The state machine looks like: + * + * session.finalize + * session.close failed return to pool + * Created -------> Claimed ----------> Closed ---------> Disposed + * ^ | ^ + * pool.acquire | |returned to pool | + * | | | + * ---- Available <----- | + * | pool.close | + * --------------------------------- + */ public class PooledConnection implements Connection { /** The real connection who will do all the real jobs */ private final Connection delegate; - /** A reference to the {@link ThreadCachingPool pool} so that we could return this resource back */ private final Consumer release; private boolean unrecoverableErrorsOccurred = false; private Runnable onError = null; + private final Clock clock; + private long lastUsed; - public PooledConnection( Connection delegate, Consumer release ) + public PooledConnection( Connection delegate, Consumer release, Clock clock ) { this.delegate = delegate; this.release = release; + this.clock = clock; + this.lastUsed = clock.millis(); + } + + public void updateUsageTimestamp() + { + lastUsed = clock.millis(); } @Override @@ -148,23 +176,15 @@ public void receiveOne() } @Override + /** + * Make sure only close the connection once on each session to avoid releasing the connection twice, a.k.a. + * adding back the connection twice into the pool. + */ public void close() { - // In case this session has an open result or transaction or something, - // make sure it's reset to a nice state before we reuse it. - try - { - reset( StreamCollector.NO_OP ); - sync(); - } - catch (Exception ex) - { - dispose(); - } - finally - { - release.accept( this ); - } + release.accept( this ); + // put the full logic of deciding whether to dispose the connection or to put it back to + // the pool into the release object } @Override @@ -221,4 +241,10 @@ private boolean isClientOrTransientError( RuntimeException e ) && (((Neo4jException) e).neo4jErrorCode().contains( "ClientError" ) || ((Neo4jException) e).neo4jErrorCode().contains( "TransientError" )); } + + public long idleTime() + { + long idleTime = clock.millis() - lastUsed; + return idleTime; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java new file mode 100644 index 0000000000..9292a04e83 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java @@ -0,0 +1,129 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.pool; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.util.Consumer; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Value; + +/** + * The responsibility of the PooledConnectionReleaseConsumer is to release valid connections + * back to the connections queue. + */ +class PooledConnectionReleaseConsumer implements Consumer +{ + private final BlockingQueue connections; + private final long minIdleBeforeConnectionTest; + private static final Map NO_PARAMETERS = new HashMap<>(); + private final AtomicBoolean driverStopped; + + PooledConnectionReleaseConsumer( BlockingQueue connections, AtomicBoolean driverStopped, + Config config ) + { + this.connections = connections; + this.driverStopped = driverStopped; + this.minIdleBeforeConnectionTest = config.idleTimeBeforeConnectionTest(); + } + + @Override + public void accept( PooledConnection pooledConnection ) + { + if( driverStopped.get() ) + { + // if the driver already closed, then no need to try to return to pool, just directly close this connection + pooledConnection.dispose(); + } + else if ( validConnection( pooledConnection ) ) + { + boolean released = connections.offer( pooledConnection ); + if( !released ) + { + // if the connection could be put back to the pool, then we let the pool to manage it. + // Otherwise, we close the connection directly here. + pooledConnection.dispose(); + } + else if ( driverStopped.get() ) + { + // If our adding the pooledConnection to the queue was racing with the closing of the driver, + // then the loop where the driver is closing all available connections might not observe our newly + // added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter + // which connection we get back, because other threads might be in the same situation as ours. It only + // matters that we added *a* connection that might not be observed by the loop, and that we dispose of + // *a* connection in response. + PooledConnection conn = connections.poll(); + if ( conn != null ) + { + conn.dispose(); + } + } + } + else + { + pooledConnection.dispose(); + } + } + + boolean validConnection( PooledConnection pooledConnection ) + { + return reset(pooledConnection) && + !pooledConnection.hasUnrecoverableErrors() && + (pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection )); + } + + /** + * In case this session has an open result or transaction or something, + * make sure it's reset to a nice state before we reuse it. + * @param conn the PooledConnection + * @return true if the connection is reset successfully without any error, otherwise false. + */ + private boolean reset( PooledConnection conn ) + { + try + { + conn.reset( StreamCollector.NO_OP ); + conn.sync(); + return true; + } + catch ( Throwable e ) + { + return false; + } + } + + private boolean ping( PooledConnection conn ) + { + try + { + conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, StreamCollector.NO_OP ); + conn.pullAll( StreamCollector.NO_OP ); + conn.sync(); + return true; + } + catch ( Throwable e ) + { + return false; + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionValidator.java b/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionValidator.java deleted file mode 100644 index 5aa358efc2..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionValidator.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.pool; - -import java.util.HashMap; -import java.util.Map; - -import org.neo4j.driver.internal.spi.StreamCollector; -import org.neo4j.driver.v1.Value; - -/** - * Validates connections - determining if they are ok to keep in the pool, or if they should be disposed of. - */ -public class PooledConnectionValidator implements ValidationStrategy -{ - private static final Map NO_PARAMETERS = new HashMap<>(); - - /** - * Connections that have been idle longer than this threshold will have a ping test performed on them. - */ - private final long minIdleBeforeConnectionTest; - - public PooledConnectionValidator( long minIdleBeforeConnectionTest ) - { - this.minIdleBeforeConnectionTest = minIdleBeforeConnectionTest; - } - - @Override - public boolean isValid( PooledConnection conn, long idleTime ) - { - if ( conn.hasUnrecoverableErrors() ) - { - return false; - } - - return idleTime <= minIdleBeforeConnectionTest || ping( conn ); - } - - private boolean ping( PooledConnection conn ) - { - try - { - conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, StreamCollector.NO_OP ); - conn.pullAll( StreamCollector.NO_OP ); - conn.sync(); - return true; - } catch( Throwable e ) - { - return false; - } - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java b/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java deleted file mode 100644 index 03e786e6e0..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/ThreadCachingPool.java +++ /dev/null @@ -1,366 +0,0 @@ -/** - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.pool; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.exceptions.Neo4jException; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -/** - * A general pool implementation, heavily inspired by Chris Vests "stormpot" pool, but without a background thread - * managing allocation. - *

- * Some quick info to understand this pool: - *

  • - * The pool caches a reference for each thread who uses this pool to the resource that has ever been assigned to the - * thread by the pool. - * Next time when the same thread wants to value a resource from the pool again, if the cached resource happens to be - * free, the same resource will be assigned directly to the thread to avoid searching from the global pool. - *
  • - *
  • - * The pool will fail all incoming resource requests once all the resources in the pool has been consumed. But the - * resource requesting thread could choose to wait for a while for a possible available resource. - *
  • - * - * @param A pool of T - */ -public class ThreadCachingPool implements AutoCloseable -{ - /** - * Keeps a reference to a locally cached pool slot, to avoid global lookups. - * Other threads may still access the slot in here, if they cannot acquire an object from the global pool. - */ - private final ThreadLocal> local = new ThreadLocal<>(); - - /** Keeps references to slots that are likely (but not necessarily) live */ - private final BlockingQueue> live = new LinkedBlockingQueue<>(); - - /** Keeps references to slots that have been disposed of. Used when re-allocating. */ - private final BlockingQueue> disposed = new LinkedBlockingQueue<>(); - - /** - * All slots in the pool, used when we shut down to dispose of all instances, as well as when there are no known - * live pool objects, when we use this array to find slots cached by other threads - */ - private final Slot[] all; - - /** Max number of slots in the pool */ - private final int maxSize; - - /** While the pool is initially populating, this tracks indexes into the {@link #all} array */ - private final AtomicInteger nextSlotIndex = new AtomicInteger( 0 ); - - /** Shutdown flag */ - private final AtomicBoolean stopped = new AtomicBoolean( false ); - - private final Allocator allocator; - private final ValidationStrategy validationStrategy; - private final Clock clock; - - public ThreadCachingPool( int targetSize, Allocator allocator, ValidationStrategy validationStrategy, - Clock clock ) - { - this.maxSize = targetSize; - this.allocator = allocator; - this.validationStrategy = validationStrategy; - this.clock = clock; - this.all = new Slot[targetSize]; - } - - public T acquire( long timeout, TimeUnit unit ) throws InterruptedException - { - long deadline = clock.millis() + unit.toMillis( timeout ); - - // 1. Try and value an object from our local slot - Slot slot = local.get(); - - if ( slot != null && slot.availableToClaimed() ) - { - if ( slot.isValid( validationStrategy ) ) - { - allocator.onAcquire( slot.value ); - return slot.value; - } - else - { - // We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it, - // and go to the global pool. - dispose( slot ); - } - } - - // 2. If that fails, acquire from big pool - return acquireFromGlobal( deadline ); - } - - private T acquireFromGlobal( long deadline ) throws InterruptedException - { - Slot slot = live.poll(); - - for (; ; ) - { - if ( stopped.get() ) - { - throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); - } - - // 1. Check if the slot we pulled from the live queue is viable - if ( slot != null ) - { - // Yay, got a slot - can we keep it? - if ( slot.availableToClaimed() ) - { - if ( slot.isValid( validationStrategy ) ) - { - break; - } - else - { - // We've acquired the slot, but the validation strategy says it's time for it to die. - dispose( slot ); - } - } - } - else - { - // 2. Exhausted the likely-to-be-live list, are there any disposed-of slots we can recycle? - slot = disposed.poll(); - if ( slot != null ) - { - // Got a hold of a previously disposed slot! - slot = allocate( slot.index ); - break; - } - - // 3. Can we expand the pool? - int index = nextSlotIndex.get(); - if ( maxSize > index && nextSlotIndex.compareAndSet( index, index + 1 ) ) - { - slot = allocate( index ); - break; - } - } - - // Enforce max wait time - long timeLeft = deadline - clock.millis(); - if ( timeLeft <= 0 ) - { - return null; - } - - // Wait for a bit to see if someone releases something to the live queue - slot = live.poll( Math.min( timeLeft, 10 ), MILLISECONDS ); - } - - // Keep this slot cached with our thread, so that we can grab this value quickly next time, - // assuming threads generally availableToClaimed one instance at a time - local.set( slot ); - allocator.onAcquire( slot.value ); - return slot.value; - } - - private void dispose( Slot slot ) - { - if ( !slot.claimedToDisposed() ) - { - throw new IllegalStateException( "Cannot dispose unclaimed pool object: " + slot ); - } - - // Done before below, in case dispose call fails. This is safe since objects on the - // pool are used for read-only operations - disposed.add( slot ); - allocator.onDispose( slot.value ); - } - - /** - * This method will create allocate a new value, returning the slot in the {@code CLAIMED} state. If allocation - * fails, a slot for the same index will be added to the {@link #disposed} list, and an exception will be thrown. - * @param slotIndex the slot index to use for the new item - * @return a slot in the {@code CLAIMED} state - */ - private Slot allocate( int slotIndex ) - { - final Slot slot = new Slot<>( slotIndex, clock ); - try - { - // Allocate the new item - this may fail with an exception - slot.set( allocator.allocate( createDisposeCallback( slot ) ) ); - - // Store the slot in the global list of slots - all[slotIndex] = slot; - - // Return it :) - return slot; - } - catch( Neo4jException e ) - { - // Failed to allocate slot, return it to the list of disposed slots, rethrow exception. - slot.claimedToDisposed(); - disposed.add( slot ); - throw e; - } - } - - private Consumer createDisposeCallback( final Slot slot ) - { - return new Consumer() - { - @Override - public void accept( T t ) - { - slot.updateUsageTimestamp(); - if ( !slot.isValid( validationStrategy ) ) - { - // The value has for some reason become invalid, dispose of it - dispose( slot ); - return; - } - - if ( !slot.claimedToAvailable() ) - { - throw new IllegalStateException( "Failed to release pooled object: " + slot ); - } - - // Make sure the pool isn't being stopped in the middle of all these shenanigans - if ( !stopped.get() ) - { - // All good, as you were. - live.add( slot ); - } - else - { - // Another thread concurrently closing the pool may have started closing before we - // set our slot to "available". In that case, the slot will not be disposed of by the closing thread - // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot. - // If we can't claim the slot back, that means another thread is dealing with it. - if ( slot.availableToClaimed() ) - { - dispose( slot ); - } - } - } - }; - } - - @Override - public void close() - { - if ( !stopped.compareAndSet( false, true ) ) - { - return; - } - for ( Slot slot : all ) - { - if ( slot != null && slot.availableToClaimed() ) - { - dispose( slot ); - } - } - } -} - -/** - * Stores one pooled resource, along with pooling metadata about it. Every instance the pool manages - * has one of these objects, independent of if it's currently in use or if it is idle in the pool. - */ -class Slot -{ - enum State - { - AVAILABLE, - CLAIMED, - DISPOSED - } - - final AtomicReference state = new AtomicReference<>( State.CLAIMED ); - final int index; - final Clock clock; - - long lastUsed; - T value; - - public static Slot disposed( int index, Clock clock ) - { - Slot slot = new Slot<>( index, clock ); - slot.claimedToDisposed(); - return slot; - } - - /** - * @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is - * disposed - */ - Slot( int index, Clock clock ) - { - this.index = index; - this.clock = clock; - this.lastUsed = 0; - } - - public void set( T value ) - { - this.value = value; - } - - public boolean availableToClaimed() - { - return state.compareAndSet( State.AVAILABLE, State.CLAIMED ); - } - - public boolean claimedToAvailable() - { - updateUsageTimestamp(); - return state.compareAndSet( State.CLAIMED, State.AVAILABLE ); - } - - public boolean claimedToDisposed() - { - return state.compareAndSet( State.CLAIMED, State.DISPOSED ); - } - - public void updateUsageTimestamp() - { - lastUsed = clock.millis(); - } - - boolean isValid( ValidationStrategy strategy ) - { - return strategy.isValid( value, clock.millis() - lastUsed ); - } - - @Override - public String toString() - { - return "Slot{" + - "value=" + value + - ", lastUsed=" + lastUsed + - ", index=" + index + - ", state=" + state.get() + - '}'; - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/pool/ValidationStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/util/Consumers.java similarity index 64% rename from driver/src/main/java/org/neo4j/driver/internal/pool/ValidationStrategy.java rename to driver/src/main/java/org/neo4j/driver/internal/util/Consumers.java index 5e647f4176..e0954ab8fd 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/pool/ValidationStrategy.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Consumers.java @@ -16,9 +16,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.neo4j.driver.internal.pool; +package org.neo4j.driver.internal.util; -public interface ValidationStrategy +public final class Consumers { - boolean isValid( T value, long idleTime ); + private Consumers() + { + throw new UnsupportedOperationException( "Do not instantiate" ); + } + + public static Consumer noOp() + { + return new Consumer() + { + @Override + public void accept( T t ) + { + //Do nothing + } + }; + } } diff --git a/driver/src/main/java/org/neo4j/driver/v1/Config.java b/driver/src/main/java/org/neo4j/driver/v1/Config.java index 32643f97e0..261c286748 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Config.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Config.java @@ -51,6 +51,8 @@ public class Config /** The size of connection pool for each database url */ private final int connectionPoolSize; + private final int maxIdleConnectionPoolSize; + /** Connections that have been idle longer than this threshold will have a ping test performed on them. */ private final long idleTimeBeforeConnectionTest; @@ -65,6 +67,7 @@ private Config( ConfigBuilder builder ) this.logging = builder.logging; this.connectionPoolSize = builder.connectionPoolSize; + this.maxIdleConnectionPoolSize = builder.maxIdleConnectionPoolSize; this.idleTimeBeforeConnectionTest = builder.idleTimeBeforeConnectionTest; this.encryptionLevel = builder.encruptionLevel; @@ -84,9 +87,19 @@ public Logging logging() * Max number of connections per URL for this driver. * @return the max number of connections */ + @Deprecated public int connectionPoolSize() { - return connectionPoolSize; + return maxIdleConnectionPoolSize; + } + + /** + * Max number of idle connections per URL for this driver. + * @return the max number of connections + */ + public int maxIdleConnectionPoolSize() + { + return maxIdleConnectionPoolSize; } /** @@ -139,6 +152,7 @@ public static class ConfigBuilder { private Logging logging = new JULogging( Level.INFO ); private int connectionPoolSize = 50; + private int maxIdleConnectionPoolSize = 10; private long idleTimeBeforeConnectionTest = 200; private EncryptionLevel encruptionLevel = EncryptionLevel.REQUIRED; private TrustStrategy trustStrategy = trustOnFirstUse( @@ -169,12 +183,27 @@ public ConfigBuilder withLogging( Logging logging ) * @param size the max number of sessions to keep open * @return this builder */ + @Deprecated public ConfigBuilder withMaxSessions( int size ) { this.connectionPoolSize = size; return this; } + /** + * The max number of idle sessions to keep open at once. Configure this + * higher if you want more concurrent sessions, or lower if you want + * to lower the pressure on the database instance. + * + * @param size the max number of idle sessions to keep open + * @return this builder + */ + public ConfigBuilder withMaxIdleSessions( int size ) + { + this.maxIdleConnectionPoolSize = size; + return this; + } + /** * Pooled sessions that have been unused for longer than this timeout * will be tested before they are used again, to ensure they are still live. diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java index 72eb47e50d..f56cf31c04 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalSessionTest.java @@ -24,10 +24,14 @@ import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.Logger; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.exceptions.ClientException; +import static junit.framework.Assert.fail; import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -135,4 +139,30 @@ public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throw // When sess.beginTransaction(); } + + @Test + public void shouldGetExceptionIfTryingToCloseSessionMoreThanOnce() throws Throwable + { + // Given + InternalSession sess = new InternalSession( mock(Connection.class), mock(Logger.class) ); + try + { + sess.close(); + } + catch( Exception e ) + { + fail("Should not get any problem to close first time"); + } + + // When + try + { + sess.close(); + fail( "Should have received an error to close second time" ); + } + catch( Exception e ) + { + assertThat( e.getMessage(), equalTo("This session has already been closed." )); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java index ba0a8fc87a..c164dcb578 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/pool/ConnectionInvalidationTest.java @@ -22,8 +22,16 @@ import org.mockito.Mockito; import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.StreamCollector; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.Consumers; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.TransientException; @@ -33,24 +41,79 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ConnectionInvalidationTest { private final Connection delegate = mock( Connection.class ); - private final PooledConnection conn = new PooledConnection( delegate, null ); + Clock clock = mock( Clock.class ); + private final PooledConnection conn = + new PooledConnection( delegate, Consumers.noOp(), Clock.SYSTEM ); + + @SuppressWarnings( "unchecked" ) @Test public void shouldInvalidateConnectionThatIsOld() throws Throwable { // Given a connection that's broken - Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).sync(); + Mockito.doThrow( new ClientException( "That didn't work" ) ) + .when( delegate ).run( anyString(), anyMap(), any( StreamCollector.class ) ); + Config config = Config.defaultConfig(); + when( clock.millis() ).thenReturn( 0L, config.idleTimeBeforeConnectionTest() + 1L ); + PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); + + // When/Then + BlockingQueue queue = mock( BlockingQueue.class ); + PooledConnectionReleaseConsumer consumer = + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), config ); + consumer.accept( conn ); + + verify( queue, never() ).add( conn ); + } + + @SuppressWarnings( "unchecked" ) + @Test + public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable + { + // Given a connection that's broken + Mockito.doThrow( new ClientException( "That didn't work" ) ) + .when( delegate ).run( anyString(), anyMap(), any( StreamCollector.class ) ); + Config config = Config.defaultConfig(); + when( clock.millis() ).thenReturn( 0L, config.idleTimeBeforeConnectionTest() - 1L ); + PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); + + // When/Then + BlockingQueue queue = mock( BlockingQueue.class ); + PooledConnectionReleaseConsumer consumer = + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), config ); + consumer.accept( conn ); + + verify( queue ).offer( conn ); + } + + @Test + public void shouldInvalidConnectionIfFailedToReset() throws Throwable + { + // Given a connection that's broken + Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).reset( any( StreamCollector.class ) ); + Config config = Config.defaultConfig(); + PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); // When/Then - assertTrue( new PooledConnectionValidator( 10 ).isValid( conn, 1 ) ); - assertFalse(new PooledConnectionValidator( 10 ).isValid( conn, 100 )); - assertFalse(new PooledConnectionValidator( 10 ).isValid( conn, 10 )); + BlockingQueue queue = mock( BlockingQueue.class ); + PooledConnectionReleaseConsumer consumer = + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), config ); + consumer.accept( conn ); + + verify( queue, never() ).add( conn ); } @Test @@ -75,43 +138,56 @@ public void shouldInvalidateOnProtocolViolationExceptions() throws Throwable assertUnrecoverable( new ClientException( "Neo.ClientError.Request.Invalid", "Hello, world!" ) ); } + @SuppressWarnings( "unchecked" ) private void assertUnrecoverable( Neo4jException exception ) { - doThrow( exception ).when( delegate ).sync(); + doThrow( exception ).when( delegate ) + .run( eq("assert unrecoverable"), anyMap(), any( StreamCollector.class ) ); // When try { - conn.sync(); - fail("Should've rethrown exception"); + conn.run( "assert unrecoverable", new HashMap( ), StreamCollector.NO_OP ); + fail( "Should've rethrown exception" ); } - catch( Neo4jException e ) + catch ( Neo4jException e ) { - assertThat(e, equalTo( exception )); + assertThat( e, equalTo( exception ) ); } // Then assertTrue( conn.hasUnrecoverableErrors() ); - assertFalse( new PooledConnectionValidator( 100 ).isValid( conn, 1 ) ); + BlockingQueue queue = mock( BlockingQueue.class ); + PooledConnectionReleaseConsumer consumer = + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), Config.defaultConfig() ); + consumer.accept( conn ); + + verify( queue, never() ).offer( conn ); } + @SuppressWarnings( "unchecked" ) private void assertRecoverable( Neo4jException exception ) { - doThrow( exception ).when( delegate ).sync(); + doThrow( exception ).when( delegate ).run( eq("assert recoverable"), anyMap(), any( StreamCollector.class ) ); // When try { - conn.sync(); - fail("Should've rethrown exception"); + conn.run( "assert recoverable", new HashMap( ), StreamCollector.NO_OP ); + fail( "Should've rethrown exception" ); } - catch( Neo4jException e ) + catch ( Neo4jException e ) { - assertThat(e, equalTo( exception )); + assertThat( e, equalTo( exception ) ); } // Then assertFalse( conn.hasUnrecoverableErrors() ); - assertTrue( new PooledConnectionValidator( 100 ).isValid( conn, 1 ) ); + BlockingQueue queue = mock( BlockingQueue.class ); + PooledConnectionReleaseConsumer consumer = + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), Config.defaultConfig() ); + consumer.accept( conn ); + + verify( queue ).offer( conn ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/pool/InternalConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/pool/InternalConnectionPoolTest.java index 56af44c604..897ec18f3f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/pool/InternalConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/pool/InternalConnectionPoolTest.java @@ -18,9 +18,7 @@ */ package org.neo4j.driver.internal.pool; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.net.URI; import java.util.Collections; @@ -31,9 +29,10 @@ import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; -import org.neo4j.driver.v1.exceptions.ClientException; import static java.util.Collections.singletonList; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -44,30 +43,6 @@ public class InternalConnectionPoolTest { - @Rule - public ExpectedException exception = ExpectedException.none(); - - @Test - public void shouldThrowExceptionWhenConnectionPoolIsFull() throws Throwable - { - // Given - URI uri = URI.create( "bolt://asd" ); - Connector connector = connector( "bolt" ); - Config config = Config.build().withMaxSessions( 1 ).toConfig(); - InternalConnectionPool pool = new InternalConnectionPool( singletonList( connector ), - Clock.SYSTEM, config, AuthTokens.none(), 100 ); - - // When & Then - pool.acquire( uri ); - - exception.expect( ClientException.class ); - exception.expectMessage( - "Failed to acquire a session with Neo4j as all the connections in the connection pool are already" + - " occupied by other sessions."); - - pool.acquire( uri ); - } - @Test public void shouldAcquireAndRelease() throws Throwable { @@ -76,16 +51,17 @@ public void shouldAcquireAndRelease() throws Throwable Connector connector = connector( "bolt" ); Config config = Config.defaultConfig(); InternalConnectionPool pool = new InternalConnectionPool( singletonList( connector ), - Clock.SYSTEM, config, AuthTokens.none(), 100 ); + Clock.SYSTEM, config, AuthTokens.none()); Connection conn = pool.acquire( uri ); conn.close(); // When - pool.acquire( uri ); + Connection acquired = pool.acquire( uri ); // Then verify( connector, times( 1 ) ).connect( uri, config, AuthTokens.none() ); + assertThat( acquired, equalTo(conn) ); } private Connector connector( String scheme ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/pool/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/pool/PooledConnectionTest.java index 67f93576fe..cf0f772686 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/pool/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/pool/PooledConnectionTest.java @@ -20,45 +20,205 @@ import org.junit.Test; -import java.util.LinkedList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.spi.StreamCollector; -import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.exceptions.DatabaseException; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.Config; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; public class PooledConnectionTest { @Test - public void shouldReturnToPoolIfExceptionDuringReset() throws Throwable + public void shouldDisposeConnectionIfNotValidConnection() throws Throwable { // Given - final LinkedList returnedToPool = new LinkedList<>(); + final BlockingQueue pool = new LinkedBlockingQueue<>(1); + + final boolean[] flags = {false}; + + Connection conn = mock( Connection.class ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, + new AtomicBoolean( false ), Config.defaultConfig() /*Does not matter what config for this test*/ ) + { + @Override + boolean validConnection( PooledConnection conn ) + { + return false; + } + }; + + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + { + @Override + public void dispose() + { + flags[0] = true; + } + }; + + // When + pooledConnection.close(); + + // Then + assertThat( pool.size(), equalTo( 0 ) ); + assertThat( flags[0], equalTo( true ) ); + } + + @Test + public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throws Throwable + { + // Given + final BlockingQueue pool = new LinkedBlockingQueue<>(1); + + final boolean[] flags = {false}; + + Connection conn = mock( Connection.class ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, + new AtomicBoolean( false ), Config.defaultConfig() /*Does not matter what config for this test*/ ) + { + @Override + boolean validConnection( PooledConnection conn ) + { + return true; + } + }; + + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + { + @Override + public void dispose() + { + flags[0] = true; + } + }; + + // When + pooledConnection.close(); + + // Then + assertThat( pool, hasItem(pooledConnection) ); + assertThat( pool.size(), equalTo( 1 ) ); + assertThat( flags[0], equalTo( false ) ); + } + + @Test + public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable + { + // Given + final BlockingQueue pool = new LinkedBlockingQueue<>(1); + + final boolean[] flags = {false}; + + Connection conn = mock( Connection.class ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, + new AtomicBoolean( false ), Config.defaultConfig() /*Does not matter what config for this test*/ ) + { + @Override + boolean validConnection( PooledConnection conn ) + { + return true; + } + }; + + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ); + PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + { + @Override + public void dispose() + { + flags[0] = true; + } + }; + + // When + pooledConnection.close(); + shouldBeClosedConnection.close(); + + // Then + assertThat( pool, hasItem(pooledConnection) ); + assertThat( pool.size(), equalTo( 1 ) ); + assertThat( flags[0], equalTo( true ) ); + } + + @Test + public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable + { + // driver = GraphDatabase.driver(); + // session = driver.session(); + // ... + // driver.close() -> clear the pools + // session.close() -> well, close the connection directly without putting back to the pool + + // Given + final BlockingQueue pool = new LinkedBlockingQueue<>(1); + final boolean[] flags = {false}; + + Connection conn = mock( Connection.class ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, + new AtomicBoolean( true ), Config.defaultConfig() /*Does not matter what config for this test*/ ); + + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + { + @Override + public void dispose() + { + flags[0] = true; + } + }; + + // When + pooledConnection.close(); + + // Then + assertThat( pool.size(), equalTo( 0 ) ); + assertThat( flags[0], equalTo( true ) ); // make sure that the dispose is called + } + + @Test + public void shouldDisposeConnectionIfPoolStoppedAfterPuttingConnectionBackToPool() throws Throwable + { + // Given + final AtomicBoolean stopped = new AtomicBoolean( false ); + final BlockingQueue pool = new LinkedBlockingQueue(1){ + public boolean offer(PooledConnection conn) + { + stopped.set( true ); + // some clean work to close all connection in pool + boolean offer = super.offer( conn ); + assertThat ( this.size(), equalTo( 1 ) ); + // we successfully put the connection back to the pool + return offer; + } + }; + final boolean[] flags = {false}; + Connection conn = mock( Connection.class ); - doThrow( new DatabaseException( "asd", "asd" ) ).when(conn).reset( any( StreamCollector.class) ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, + stopped , Config.defaultConfig() /*Does not matter what config for this test*/ ); - PooledConnection pooledConnection = new PooledConnection( conn, new Consumer() + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override - public void accept( PooledConnection pooledConnection ) + public void dispose() { - returnedToPool.add( pooledConnection ); + flags[0] = true; } - } ); + }; // When pooledConnection.close(); // Then - assertThat( returnedToPool, hasItem(pooledConnection) ); - assertThat( returnedToPool.size(), equalTo( 1 )); + assertThat( pool.size(), equalTo( 0 ) ); + assertThat( flags[0], equalTo( true ) ); // make sure that the dispose is called } -} \ No newline at end of file + +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java deleted file mode 100644 index 9405b8058a..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/pool/ThreadCachingPoolTest.java +++ /dev/null @@ -1,399 +0,0 @@ -/** - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.pool; - -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.exceptions.ClientException; - -import static junit.framework.TestCase.fail; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertNull; - -public class ThreadCachingPoolTest -{ - private final List inUse = new LinkedList<>(); - private final List inPool = new LinkedList<>(); - private final List disposed = new LinkedList<>(); - - private static AtomicInteger IDGEN = new AtomicInteger(); - - @Rule - public ExpectedException exception = ExpectedException.none(); - - private final ValidationStrategy checkInvalidateFlag = new ValidationStrategy() - { - @Override - public boolean isValid( PooledObject value, long idleTime ) - { - return value.valid; - } - }; - - /** Allocator that allocates pooled objects and tracks their current state (pooled, used, disposed) */ - private final TestAllocator trackAllocator = new TestAllocator(); - - @Test - public void shouldDisposeAllOnClose() throws Throwable - { - // Given - ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - PooledObject o1 = pool.acquire( 10, TimeUnit.SECONDS ); - PooledObject o2 = pool.acquire( 10, TimeUnit.SECONDS ); - - o1.release(); - o2.release(); - - // When - pool.close(); - - // Then - assertThat( inUse, equalTo( none() ) ); - assertThat( inPool, equalTo( none() ) ); - assertThat( disposed, equalTo( items( o1, o2 ) ) ); - } - - @Test - public void shouldDisposeValuesReleasedAfterClose() throws Throwable - { - // Given - ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - PooledObject o1 = pool.acquire( 10, TimeUnit.SECONDS ); - PooledObject o2 = pool.acquire( 10, TimeUnit.SECONDS ); - - o1.release(); - pool.close(); - - // When - o2.release(); - - // Then - assertThat( inUse, equalTo( none() ) ); - assertThat( inPool, equalTo( none() ) ); - assertThat( disposed, equalTo( items( o1, o2 ) ) ); - } - - @Test - public void shouldBlockUpToTimeoutIfNoneAvailable() throws Throwable - { - // Given - ThreadCachingPool - pool = new ThreadCachingPool<>( 1, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - pool.acquire( 10, TimeUnit.SECONDS ); - - // When - PooledObject val = pool.acquire( 1, TimeUnit.SECONDS ); - - // Then - assertNull( val ); - } - - @Test - public void shouldDisposeOfInvalidItems() throws Throwable - { - // Given - ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, invalidIfIdIs(0), Clock.SYSTEM ); - - // And given we've allocated/releasd object with id 0 once (no validation on first allocation) - // TODO: Is that the right thing to do? I assume the allocator will allocate healthy objects.. - pool.acquire( 10, TimeUnit.SECONDS ).release(); - - // When - pool.acquire( 10, TimeUnit.SECONDS ); - - // Then object with id 0 should've been disposed of, and we should have one live object with id 1 - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( items( 1 ) ) ); - assertThat( disposed, equalTo( items( 0 ) ) ); - } - - @Test - public void shouldNotAllocateNewValuesAfterClose() throws Throwable - { - // Given a pool that's been closed - ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - pool.close(); - - // Expect - exception.expect( IllegalStateException.class ); - - // When - pool.acquire( 10, TimeUnit.SECONDS ); - } - - @Test - public void shouldDisposeOfObjectsThatBecomeInvalidWhileInUse() throws Throwable - { - // Given a pool that's been closed - ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - PooledObject val = pool.acquire( 10, TimeUnit.SECONDS ); - - // When - val.invalidate().release(); - - // Then - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( none() ) ); - assertThat( disposed, equalTo( items( val ) ) ); - } - - @Test - public void shouldRecoverFromItemCreationFailure() throws Throwable - { - // Given a pool where creation will fail from the value-go - ThreadCachingPool - pool = new ThreadCachingPool<>( 4, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - trackAllocator.startEmulatingCreationFailures(); - - // And given I've acquire a few items, failing to do so - for ( int i = 0; i < 4; i++ ) - { - try - { - pool.acquire( 10, TimeUnit.SECONDS ); - fail("Should not succeed at allocating any item here."); - } - catch( ClientException e ) - { - // Expected - } - } - - // When creation starts working again - trackAllocator.stopEmulatingCreationFailures(); - - // Then I should be able to allocate things - for ( int i = 0; i < 4; i++ ) - { - pool.acquire( 10, TimeUnit.SECONDS ); - } - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( items( 0, 1, 2, 3 ) ) ); - assertThat( disposed, equalTo( none() ) ); // because allocation fails, onDispose is not called - } - - @Test - public void shouldRecovedDisposedItemReallocationFailing() throws Throwable - { - // Given a pool where creation will fail from the value-go - ThreadCachingPool - pool = new ThreadCachingPool<>( 2, trackAllocator, checkInvalidateFlag, Clock.SYSTEM ); - - // And given I've allocated and released some stuff, and it became invalid, such that I have a set - // of disposed-of slots in the pool - PooledObject first = pool.acquire( 10, TimeUnit.SECONDS ); - PooledObject second = pool.acquire( 10, TimeUnit.SECONDS ); - first.invalidate(); - second.invalidate(); - first.release(); - second.release(); - - // And given (bear with me here!) allocation starts failing - trackAllocator.startEmulatingCreationFailures(); - - // And I try and allocate some stuff, failing at it - for ( int i = 0; i < 2; i++ ) - { - try - { - pool.acquire( 10, TimeUnit.SECONDS ); - fail( "Should not succeed at allocating any item here." ); - } - catch ( ClientException e ) - { - // Expected - } - } - - // When creation starts working again - trackAllocator.stopEmulatingCreationFailures(); - - // Then I should be able to allocate things - for ( int i = 0; i < 2; i++ ) - { - pool.acquire( 10, TimeUnit.SECONDS ); - } - assertThat( inPool, equalTo( none() ) ); - assertThat( inUse, equalTo( items( 2, 3 ) ) ); - // only the first two items value onDispose called, since allocation fails after that - assertThat( disposed, equalTo( items( 0, 1) ) ); - } - - private List items( int ... objects ) - { - List out = new LinkedList<>(); - for ( int id : objects ) - { - out.add( new PooledObject( id, null ) ); - } - return out; - } - - private List items( PooledObject ... objects ) - { - return Arrays.asList(objects); - } - - private List none() - { - return Collections.emptyList(); - } - - private ValidationStrategy invalidIfIdIs( final int i ) - { - return new ValidationStrategy() - { - @Override - public boolean isValid( PooledObject value, long idleTime ) - { - return value.id != i; - } - }; - } - - @Before - public void reset() - { - IDGEN.set( 0 ); - } - - private class PooledObject - { - private final int id; - private Consumer release; - private boolean valid = true; - - public PooledObject( Consumer release ) - { - this(IDGEN.getAndIncrement(), release); - } - - public PooledObject( int id, Consumer release ) - { - this.id = id; - this.release = release; - } - - public PooledObject release() - { - inUse.remove( this ); - inPool.add( this ); - release.accept( this ); - return this; - } - - public PooledObject invalidate() - { - this.valid = false; - return this; - } - - @Override - public String toString() - { - return "PooledObject<" + id + ">"; - } - - @Override - public boolean equals( Object o ) - { - if ( this == o ) - { return true; } - if ( o == null || getClass() != o.getClass() ) - { return false; } - - PooledObject that = (PooledObject) o; - - return id == that.id; - - } - - @Override - public int hashCode() - { - return id; - } - } - - private class TestAllocator implements Allocator - { - private ClientException creationException; - - @Override - public PooledObject allocate( Consumer release ) - { - if( creationException != null ) - { - throw creationException; - } - PooledObject p = new PooledObject( release ); - inPool.add( p ); - return p; - } - - @Override - public void onDispose( PooledObject o ) - { - inPool.remove( o ); - inUse.remove( o ); - disposed.add( o ); - } - - @Override - public void onAcquire( PooledObject o ) - { - inPool.remove( o ); - inUse.add( o ); - } - - public void startEmulatingCreationFailures() - { - this.creationException = new ClientException( "Failed to create item," ); - } - - public void stopEmulatingCreationFailures() - { - this.creationException = null; - } - } -} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java index e5d9f91273..42d657aed3 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java @@ -57,7 +57,7 @@ public void shouldRecoverFromServerRestart() throws Throwable s4.close(); // When - neo4j.restart(); + neo4j.forceRestart(); // Then we should be able to start using sessions again, at most O(numSessions) session calls later // TODO: These should value evicted immediately, not show up as application-loggingLevel errors first @@ -78,6 +78,11 @@ public void shouldRecoverFromServerRestart() throws Throwable } } } + + if (toleratedFailures > 0) + { + fail("This query should have failed " + toleratedFailures + " times"); + } } } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java b/driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java new file mode 100644 index 0000000000..64dd5727a7 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/stress/SessionPoolingStressIT.java @@ -0,0 +1,120 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.stress; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.util.TestNeo4j; + +import static java.util.Arrays.asList; +import static org.neo4j.driver.v1.GraphDatabase.driver; + +public class SessionPoolingStressIT +{ + @Rule + public TestNeo4j neo4j = new TestNeo4j(); + + private static final int N_THREADS = 50; + private final ExecutorService executor = Executors.newFixedThreadPool( N_THREADS ); + private static final List QUERIES = asList( "RETURN 1295 + 42", "UNWIND range(1,10000) AS x CREATE (n {prop:x}) DELETE n " ); + private static final int MAX_TIME = 10000; + private final AtomicBoolean hasFailed = new AtomicBoolean( false ); + + @Test + public void shouldWorkFine() throws InterruptedException + { + Driver driver = driver( neo4j.address(), + Config.build() + .withEncryptionLevel( Config.EncryptionLevel.NONE ) + .withMaxSessions( N_THREADS ).toConfig() ); + + doWork( driver ); + executor.awaitTermination( MAX_TIME + (int)(MAX_TIME * 0.2), TimeUnit.MILLISECONDS ); + driver.close(); + } + + private void doWork( final Driver driver ) + { + for ( int i = 0; i < N_THREADS; i++ ) + { + executor.execute( new Worker( driver ) ); + } + } + + private class Worker implements Runnable + { + private final Random random = ThreadLocalRandom.current(); + private final Driver driver; + + public Worker( Driver driver ) + { + this.driver = driver; + } + + @Override + public void run() + { + try + { + long deadline = System.currentTimeMillis() + MAX_TIME; + for (;;) + { + for ( String query : QUERIES ) + { + runQuery( query ); + } + long left = deadline - System.currentTimeMillis(); + if ( left <= 0 ) + { + break; + } + } + } + catch ( Throwable e ) + { + e.printStackTrace(); + hasFailed.set( true ); + } + } + + private void runQuery( String query ) throws InterruptedException + { + try ( Session session = driver.session() ) + { + StatementResult run = session.run( query ); + Thread.sleep( random.nextInt( 100 ) ); + run.consume(); + Thread.sleep( random.nextInt( 100 ) ); + } + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java index d258c309d3..38204fff09 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverComplianceIT.java @@ -32,7 +32,7 @@ * The base class to run all cucumber tests */ @RunWith( DriverCucumberAdapter.class ) -@CucumberOptions( features = {"target/resources/features"}, strict=true, tags={"~@db"}, format = {"pretty"}) +@CucumberOptions( features = {"target/resources/features"}, strict=true, tags={"~@db", "~@fixed_session_pool"}, format = {"pretty"}) public class DriverComplianceIT { @Rule diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/Environment.java b/driver/src/test/java/org/neo4j/driver/v1/tck/Environment.java index fa493fa572..32a67a7d1a 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/Environment.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/Environment.java @@ -49,7 +49,7 @@ public class Environment public static Map mapOfObjects; public static Map mappedTypes; - public static Driver driver = neo4j.driver(); + public static Driver driver; @Before @@ -64,6 +64,7 @@ public void resetValues() stringRunner = null; runners = new ArrayList<>(); mappedTypes = new HashMap<>( ); + driver = neo4j.driver(); } @After diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java index f1bc0a7abb..7ec7760490 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4j.java @@ -74,6 +74,11 @@ public void restart() throws Exception runner.restartNeo4j(); } + public void forceRestart() throws Exception + { + runner.forceToRestart(); + } + public void restart(Neo4jSettings neo4jSettings) throws Exception { runner.restartNeo4j( neo4jSettings );