Skip to content

Backport commit from 1.5 to 1.4: Improve disposal of broken connections #401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,15 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
return connection;
}

public List<PooledConnection> toList()
public int activeConnections()
{
return new ArrayList<>( queue );
return acquiredConnections.size();
}

void disposeBroken( PooledConnection connection )
{
acquiredConnections.remove( connection );
disposeSafely( connection );
}

public boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void accept( PooledConnection pooledConnection )
}
else
{
pooledConnection.dispose();
connections.disposeBroken( pooledConnection );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ public void close()
}
}

public int activeConnections( BoltServerAddress address )
{
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
return connectionQueue == null ? 0 : connectionQueue.activeConnections();
}

private BlockingPooledConnectionQueue pool( BoltServerAddress address )
{
BlockingPooledConnectionQueue pool = pools.get( address );
Expand All @@ -130,10 +136,16 @@ private PooledConnection acquireConnection( BoltServerAddress address,
{
ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address );

PooledConnection connection;
PooledConnection connection = null;
boolean connectionCreated;
do
{
// dispose previous connection that can't be acquired
if ( connection != null )
{
connectionQueue.disposeBroken( connection );
}

connection = connectionQueue.acquire( connectionSupplier );
connectionCreated = connectionSupplier.connectionCreated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,67 @@ public void shouldTerminateBothAcquiredAndIdleConnections()
verify( connection4 ).dispose();
}

@Test
public void shouldReportZeroActiveConnectionsWhenEmpty()
{
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );

assertEquals( 0, queue.activeConnections() );
}

@Test
public void shouldReportZeroActiveConnectionsWhenHasOnlyIdleConnections()
{
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );

queue.offer( mock( PooledConnection.class ) );
queue.offer( mock( PooledConnection.class ) );

assertEquals( 0, queue.activeConnections() );
}

@Test
@SuppressWarnings( "unchecked" )
public void shouldReportActiveConnections()
{
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );

PooledConnection connection1 = mock( PooledConnection.class );
PooledConnection connection2 = mock( PooledConnection.class );
PooledConnection connection3 = mock( PooledConnection.class );

queue.offer( connection1 );
queue.offer( connection2 );
queue.offer( connection3 );

queue.acquire( mock( Supplier.class ) );
queue.acquire( mock( Supplier.class ) );
queue.acquire( mock( Supplier.class ) );

assertEquals( 3, queue.activeConnections() );

queue.offer( connection1 );
queue.offer( connection2 );
queue.offer( connection3 );

assertEquals( 0, queue.activeConnections() );
}

@Test
@SuppressWarnings( "unchecked" )
public void shouldDisposeBrokenConnections()
{
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );

queue.offer( mock( PooledConnection.class ) );
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
assertEquals( 1, queue.activeConnections() );

queue.disposeBroken( connection );
assertEquals( 0, queue.activeConnections() );
verify( connection ).dispose();
}

private static BlockingPooledConnectionQueue newConnectionQueue( int capacity )
{
return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.net.pooling;

import org.junit.Test;

import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.internal.util.Supplier;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;

public class PooledConnectionReleaseConsumerTest
{
@Test
public void shouldOfferReusableConnectionsBackToTheConnectionsQueue()
{
BlockingPooledConnectionQueue queue = newConnectionQueue();
PooledConnection connection = acquireConnection( queue );

PooledConnectionValidator validator = newConnectionValidator( true );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );

releaseConsumer.accept( connection );

// connection should now be idle
assertEquals( 0, queue.activeConnections() );
assertEquals( 1, queue.size() );

verify( connection ).reset();
verify( connection ).sync();
}

@Test
public void shouldAskConnectionsQueueToDisposeNotReusableConnections()
{
BlockingPooledConnectionQueue queue = newConnectionQueue();
PooledConnection connection = acquireConnection( queue );

PooledConnectionValidator validator = newConnectionValidator( false );
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );

releaseConsumer.accept( connection );

// connection should've been disposed
assertEquals( 0, queue.activeConnections() );
assertEquals( 0, queue.size() );

verify( connection ).dispose();
}

private static BlockingPooledConnectionQueue newConnectionQueue()
{
return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, 5, DEV_NULL_LOGGING );
}

@SuppressWarnings( "unchecked" )
private static PooledConnection acquireConnection( BlockingPooledConnectionQueue queue )
{
queue.offer( newConnectionMock() );
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
assertEquals( 1, queue.activeConnections() );
return connection;
}

private static PooledConnectionValidator newConnectionValidator( boolean allowsConnections )
{
ConnectionPool pool = mock( ConnectionPool.class );
when( pool.hasAddress( LOCAL_DEFAULT ) ).thenReturn( allowsConnections );
return new PooledConnectionValidator( pool );
}

private static PooledConnection newConnectionMock()
{
PooledConnection connection = mock( PooledConnection.class );
when( connection.boltServerAddress() ).thenReturn( LOCAL_DEFAULT );
return connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.mockito.stubbing.Answer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -42,11 +44,13 @@
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.FakeClock;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Value;

import static java.util.Collections.newSetFromMap;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
Expand All @@ -56,6 +60,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -64,6 +69,7 @@
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT;
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.v1.Values.value;

public class SocketConnectionPoolTest
{
Expand Down Expand Up @@ -489,6 +495,98 @@ public void acquireRetriesUntilAConnectionIsCreated()
inOrder.verify( connection4, never() ).sync();
}

@Test
public void reportActiveConnectionsWhenEmpty()
{
SocketConnectionPool pool = newPool( newMockConnector() );

int activeConnections1 = pool.activeConnections( ADDRESS_1 );
int activeConnections2 = pool.activeConnections( ADDRESS_2 );
int activeConnections3 = pool.activeConnections( ADDRESS_3 );

assertEquals( 0, activeConnections1 );
assertEquals( 0, activeConnections2 );
assertEquals( 0, activeConnections3 );
}

@Test
public void reportActiveConnectionsWhenHasAcquiredConnections()
{
int acquiredConnections = 23;
SocketConnectionPool pool = newPool( newMockConnector() );

for ( int i = 0; i < acquiredConnections; i++ )
{
assertNotNull( pool.acquire( ADDRESS_1 ) );
}

assertEquals( acquiredConnections, pool.activeConnections( ADDRESS_1 ) );
}

@Test
public void reportActiveConnectionsWhenHasIdleConnections()
{
Connection connection = newConnectionMock( ADDRESS_1 );
Connector connector = newMockConnector( connection );
SocketConnectionPool pool = newPool( connector );

PooledConnection connection1 = pool.acquire( ADDRESS_1 );
PooledConnection connection2 = pool.acquire( ADDRESS_1 );

assertEquals( 2, pool.activeConnections( ADDRESS_1 ) );

connection1.close();
connection2.close();

assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
}

@Test
public void shouldForgetBrokenIdleConnection()
{
Connection connection1 = newConnectionMock( ADDRESS_1 );
Connection connection2 = newConnectionMock( ADDRESS_1 );

doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset();

int idleTimeBeforeConnectionTest = 42;
FakeClock clock = new FakeClock();
Connector connector = newMockConnector( connection1, connection2 );
SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest );

// acquire and release one connection
pool.acquire( ADDRESS_1 ).close();
// make this connection seem idle for too long
clock.progress( idleTimeBeforeConnectionTest + 42 );

PooledConnection acquiredConnection = pool.acquire( ADDRESS_1 );

Map<String,Value> auth = Collections.singletonMap( "Key", value( "Value" ) );
acquiredConnection.init( "DummyClient", auth );
verify( connection1, never() ).init( "DummyClient", auth );
verify( connection2 ).init( "DummyClient", auth );

assertEquals( 1, pool.activeConnections( ADDRESS_1 ) );
acquiredConnection.close();
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
}

@Test
public void shouldForgetIdleConnection()
{
Connection connection = newConnectionMock( ADDRESS_1 );
doThrow( new RuntimeException() ).when( connection ).reset();

SocketConnectionPool pool = newPool( newMockConnector( connection ), new FakeClock(), 42 );
PooledConnection pooledConnection = pool.acquire( ADDRESS_1 );

// release the connection, it should fail to reset and be disposed
pooledConnection.close();

assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
verify( connection ).close();
}

private static Answer<Connection> createConnectionAnswer( final Set<Connection> createdConnections )
{
return new Answer<Connection>()
Expand Down