From b2f452e49e638fbc197fd551dde8d944173f8715 Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 7 Jul 2017 11:22:26 +0200 Subject: [PATCH] Backport commit from 1.5 to 1.4: Improve disposal of broken connections Connection pool can discard broken connections during acquisition (when connection liveness check timeout is configured) and when connections are returned to the pool. In both cases connections should be disposed and removed from the set of active connections. This is especially important with least connected load balancing strategy which examines amount of active connections for each address. This commit makes sure broken connections are disposed through the connections queue to make sure active set is always updated. --- .../BlockingPooledConnectionQueue.java | 10 +- .../PooledConnectionReleaseConsumer.java | 2 +- .../net/pooling/SocketConnectionPool.java | 14 ++- .../BlockingPooledConnectionQueueTest.java | 61 +++++++++++ .../PooledConnectionReleaseConsumerTest.java | 100 ++++++++++++++++++ .../net/pooling/SocketConnectionPoolTest.java | 98 +++++++++++++++++ 6 files changed, 281 insertions(+), 4 deletions(-) create mode 100644 driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java index a62f757aa5..10d335a51e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java @@ -105,9 +105,15 @@ public PooledConnection acquire( Supplier supplier ) return connection; } - public List toList() + public int activeConnections() { - return new ArrayList<>( queue ); + return acquiredConnections.size(); + } + + void disposeBroken( PooledConnection connection ) + { + acquiredConnections.remove( connection ); + disposeSafely( connection ); } public boolean isEmpty() diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index 84430fe27d..a7431d94e3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -47,7 +47,7 @@ public void accept( PooledConnection pooledConnection ) } else { - pooledConnection.dispose(); + connections.disposeBroken( pooledConnection ); } } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 463525ecaa..06184dcae6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -109,6 +109,12 @@ public void close() } } + public int activeConnections( BoltServerAddress address ) + { + BlockingPooledConnectionQueue connectionQueue = pools.get( address ); + return connectionQueue == null ? 0 : connectionQueue.activeConnections(); + } + private BlockingPooledConnectionQueue pool( BoltServerAddress address ) { BlockingPooledConnectionQueue pool = pools.get( address ); @@ -130,10 +136,16 @@ private PooledConnection acquireConnection( BoltServerAddress address, { ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address ); - PooledConnection connection; + PooledConnection connection = null; boolean connectionCreated; do { + // dispose previous connection that can't be acquired + if ( connection != null ) + { + connectionQueue.disposeBroken( connection ); + } + connection = connectionQueue.acquire( connectionSupplier ); connectionCreated = connectionSupplier.connectionCreated(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java index 67d6fad4bd..98aaf7539f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java @@ -239,6 +239,67 @@ public void shouldTerminateBothAcquiredAndIdleConnections() verify( connection4 ).dispose(); } + @Test + public void shouldReportZeroActiveConnectionsWhenEmpty() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + assertEquals( 0, queue.activeConnections() ); + } + + @Test + public void shouldReportZeroActiveConnectionsWhenHasOnlyIdleConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + queue.offer( mock( PooledConnection.class ) ); + queue.offer( mock( PooledConnection.class ) ); + + assertEquals( 0, queue.activeConnections() ); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void shouldReportActiveConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + PooledConnection connection3 = mock( PooledConnection.class ); + + queue.offer( connection1 ); + queue.offer( connection2 ); + queue.offer( connection3 ); + + queue.acquire( mock( Supplier.class ) ); + queue.acquire( mock( Supplier.class ) ); + queue.acquire( mock( Supplier.class ) ); + + assertEquals( 3, queue.activeConnections() ); + + queue.offer( connection1 ); + queue.offer( connection2 ); + queue.offer( connection3 ); + + assertEquals( 0, queue.activeConnections() ); + } + + @Test + @SuppressWarnings( "unchecked" ) + public void shouldDisposeBrokenConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue( 5 ); + + queue.offer( mock( PooledConnection.class ) ); + PooledConnection connection = queue.acquire( mock( Supplier.class ) ); + assertEquals( 1, queue.activeConnections() ); + + queue.disposeBroken( connection ); + assertEquals( 0, queue.activeConnections() ); + verify( connection ).dispose(); + } + private static BlockingPooledConnectionQueue newConnectionQueue( int capacity ) { return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java new file mode 100644 index 0000000000..305876daa4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumerTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + +import org.junit.Test; + +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.internal.util.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; + +public class PooledConnectionReleaseConsumerTest +{ + @Test + public void shouldOfferReusableConnectionsBackToTheConnectionsQueue() + { + BlockingPooledConnectionQueue queue = newConnectionQueue(); + PooledConnection connection = acquireConnection( queue ); + + PooledConnectionValidator validator = newConnectionValidator( true ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator ); + + releaseConsumer.accept( connection ); + + // connection should now be idle + assertEquals( 0, queue.activeConnections() ); + assertEquals( 1, queue.size() ); + + verify( connection ).reset(); + verify( connection ).sync(); + } + + @Test + public void shouldAskConnectionsQueueToDisposeNotReusableConnections() + { + BlockingPooledConnectionQueue queue = newConnectionQueue(); + PooledConnection connection = acquireConnection( queue ); + + PooledConnectionValidator validator = newConnectionValidator( false ); + PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator ); + + releaseConsumer.accept( connection ); + + // connection should've been disposed + assertEquals( 0, queue.activeConnections() ); + assertEquals( 0, queue.size() ); + + verify( connection ).dispose(); + } + + private static BlockingPooledConnectionQueue newConnectionQueue() + { + return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, 5, DEV_NULL_LOGGING ); + } + + @SuppressWarnings( "unchecked" ) + private static PooledConnection acquireConnection( BlockingPooledConnectionQueue queue ) + { + queue.offer( newConnectionMock() ); + PooledConnection connection = queue.acquire( mock( Supplier.class ) ); + assertEquals( 1, queue.activeConnections() ); + return connection; + } + + private static PooledConnectionValidator newConnectionValidator( boolean allowsConnections ) + { + ConnectionPool pool = mock( ConnectionPool.class ); + when( pool.hasAddress( LOCAL_DEFAULT ) ).thenReturn( allowsConnections ); + return new PooledConnectionValidator( pool ); + } + + private static PooledConnection newConnectionMock() + { + PooledConnection connection = mock( PooledConnection.class ); + when( connection.boltServerAddress() ).thenReturn( LOCAL_DEFAULT ); + return connection; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java index d761981469..779ce4bf1b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java @@ -24,7 +24,9 @@ import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -42,11 +44,13 @@ import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.Value; import static java.util.Collections.newSetFromMap; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; @@ -56,6 +60,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -64,6 +69,7 @@ import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT; import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; +import static org.neo4j.driver.v1.Values.value; public class SocketConnectionPoolTest { @@ -489,6 +495,98 @@ public void acquireRetriesUntilAConnectionIsCreated() inOrder.verify( connection4, never() ).sync(); } + @Test + public void reportActiveConnectionsWhenEmpty() + { + SocketConnectionPool pool = newPool( newMockConnector() ); + + int activeConnections1 = pool.activeConnections( ADDRESS_1 ); + int activeConnections2 = pool.activeConnections( ADDRESS_2 ); + int activeConnections3 = pool.activeConnections( ADDRESS_3 ); + + assertEquals( 0, activeConnections1 ); + assertEquals( 0, activeConnections2 ); + assertEquals( 0, activeConnections3 ); + } + + @Test + public void reportActiveConnectionsWhenHasAcquiredConnections() + { + int acquiredConnections = 23; + SocketConnectionPool pool = newPool( newMockConnector() ); + + for ( int i = 0; i < acquiredConnections; i++ ) + { + assertNotNull( pool.acquire( ADDRESS_1 ) ); + } + + assertEquals( acquiredConnections, pool.activeConnections( ADDRESS_1 ) ); + } + + @Test + public void reportActiveConnectionsWhenHasIdleConnections() + { + Connection connection = newConnectionMock( ADDRESS_1 ); + Connector connector = newMockConnector( connection ); + SocketConnectionPool pool = newPool( connector ); + + PooledConnection connection1 = pool.acquire( ADDRESS_1 ); + PooledConnection connection2 = pool.acquire( ADDRESS_1 ); + + assertEquals( 2, pool.activeConnections( ADDRESS_1 ) ); + + connection1.close(); + connection2.close(); + + assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + } + + @Test + public void shouldForgetBrokenIdleConnection() + { + Connection connection1 = newConnectionMock( ADDRESS_1 ); + Connection connection2 = newConnectionMock( ADDRESS_1 ); + + doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset(); + + int idleTimeBeforeConnectionTest = 42; + FakeClock clock = new FakeClock(); + Connector connector = newMockConnector( connection1, connection2 ); + SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest ); + + // acquire and release one connection + pool.acquire( ADDRESS_1 ).close(); + // make this connection seem idle for too long + clock.progress( idleTimeBeforeConnectionTest + 42 ); + + PooledConnection acquiredConnection = pool.acquire( ADDRESS_1 ); + + Map auth = Collections.singletonMap( "Key", value( "Value" ) ); + acquiredConnection.init( "DummyClient", auth ); + verify( connection1, never() ).init( "DummyClient", auth ); + verify( connection2 ).init( "DummyClient", auth ); + + assertEquals( 1, pool.activeConnections( ADDRESS_1 ) ); + acquiredConnection.close(); + assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + } + + @Test + public void shouldForgetIdleConnection() + { + Connection connection = newConnectionMock( ADDRESS_1 ); + doThrow( new RuntimeException() ).when( connection ).reset(); + + SocketConnectionPool pool = newPool( newMockConnector( connection ), new FakeClock(), 42 ); + PooledConnection pooledConnection = pool.acquire( ADDRESS_1 ); + + // release the connection, it should fail to reset and be disposed + pooledConnection.close(); + + assertEquals( 0, pool.activeConnections( ADDRESS_1 ) ); + verify( connection ).close(); + } + private static Answer createConnectionAnswer( final Set createdConnections ) { return new Answer()