Skip to content

Commit b2f452e

Browse files
lutovichZhen
authored and
Zhen
committed
Backport commit from 1.5 to 1.4: Improve disposal of broken connections
Connection pool can discard broken connections during acquisition (when connection liveness check timeout is configured) and when connections are returned to the pool. In both cases connections should be disposed and removed from the set of active connections. This is especially important with least connected load balancing strategy which examines amount of active connections for each address. This commit makes sure broken connections are disposed through the connections queue to make sure active set is always updated.
1 parent 44e6a3f commit b2f452e

File tree

6 files changed

+281
-4
lines changed

6 files changed

+281
-4
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,15 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
105105
return connection;
106106
}
107107

108-
public List<PooledConnection> toList()
108+
public int activeConnections()
109109
{
110-
return new ArrayList<>( queue );
110+
return acquiredConnections.size();
111+
}
112+
113+
void disposeBroken( PooledConnection connection )
114+
{
115+
acquiredConnections.remove( connection );
116+
disposeSafely( connection );
111117
}
112118

113119
public boolean isEmpty()

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void accept( PooledConnection pooledConnection )
4747
}
4848
else
4949
{
50-
pooledConnection.dispose();
50+
connections.disposeBroken( pooledConnection );
5151
}
5252
}
5353
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ public void close()
109109
}
110110
}
111111

112+
public int activeConnections( BoltServerAddress address )
113+
{
114+
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
115+
return connectionQueue == null ? 0 : connectionQueue.activeConnections();
116+
}
117+
112118
private BlockingPooledConnectionQueue pool( BoltServerAddress address )
113119
{
114120
BlockingPooledConnectionQueue pool = pools.get( address );
@@ -130,10 +136,16 @@ private PooledConnection acquireConnection( BoltServerAddress address,
130136
{
131137
ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address );
132138

133-
PooledConnection connection;
139+
PooledConnection connection = null;
134140
boolean connectionCreated;
135141
do
136142
{
143+
// dispose previous connection that can't be acquired
144+
if ( connection != null )
145+
{
146+
connectionQueue.disposeBroken( connection );
147+
}
148+
137149
connection = connectionQueue.acquire( connectionSupplier );
138150
connectionCreated = connectionSupplier.connectionCreated();
139151
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,67 @@ public void shouldTerminateBothAcquiredAndIdleConnections()
239239
verify( connection4 ).dispose();
240240
}
241241

242+
@Test
243+
public void shouldReportZeroActiveConnectionsWhenEmpty()
244+
{
245+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
246+
247+
assertEquals( 0, queue.activeConnections() );
248+
}
249+
250+
@Test
251+
public void shouldReportZeroActiveConnectionsWhenHasOnlyIdleConnections()
252+
{
253+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
254+
255+
queue.offer( mock( PooledConnection.class ) );
256+
queue.offer( mock( PooledConnection.class ) );
257+
258+
assertEquals( 0, queue.activeConnections() );
259+
}
260+
261+
@Test
262+
@SuppressWarnings( "unchecked" )
263+
public void shouldReportActiveConnections()
264+
{
265+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
266+
267+
PooledConnection connection1 = mock( PooledConnection.class );
268+
PooledConnection connection2 = mock( PooledConnection.class );
269+
PooledConnection connection3 = mock( PooledConnection.class );
270+
271+
queue.offer( connection1 );
272+
queue.offer( connection2 );
273+
queue.offer( connection3 );
274+
275+
queue.acquire( mock( Supplier.class ) );
276+
queue.acquire( mock( Supplier.class ) );
277+
queue.acquire( mock( Supplier.class ) );
278+
279+
assertEquals( 3, queue.activeConnections() );
280+
281+
queue.offer( connection1 );
282+
queue.offer( connection2 );
283+
queue.offer( connection3 );
284+
285+
assertEquals( 0, queue.activeConnections() );
286+
}
287+
288+
@Test
289+
@SuppressWarnings( "unchecked" )
290+
public void shouldDisposeBrokenConnections()
291+
{
292+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
293+
294+
queue.offer( mock( PooledConnection.class ) );
295+
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
296+
assertEquals( 1, queue.activeConnections() );
297+
298+
queue.disposeBroken( connection );
299+
assertEquals( 0, queue.activeConnections() );
300+
verify( connection ).dispose();
301+
}
302+
242303
private static BlockingPooledConnectionQueue newConnectionQueue( int capacity )
243304
{
244305
return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) );
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import org.junit.Test;
22+
23+
import org.neo4j.driver.internal.spi.ConnectionPool;
24+
import org.neo4j.driver.internal.spi.PooledConnection;
25+
import org.neo4j.driver.internal.util.Supplier;
26+
27+
import static org.junit.Assert.assertEquals;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.verify;
30+
import static org.mockito.Mockito.when;
31+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
32+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
33+
34+
public class PooledConnectionReleaseConsumerTest
35+
{
36+
@Test
37+
public void shouldOfferReusableConnectionsBackToTheConnectionsQueue()
38+
{
39+
BlockingPooledConnectionQueue queue = newConnectionQueue();
40+
PooledConnection connection = acquireConnection( queue );
41+
42+
PooledConnectionValidator validator = newConnectionValidator( true );
43+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );
44+
45+
releaseConsumer.accept( connection );
46+
47+
// connection should now be idle
48+
assertEquals( 0, queue.activeConnections() );
49+
assertEquals( 1, queue.size() );
50+
51+
verify( connection ).reset();
52+
verify( connection ).sync();
53+
}
54+
55+
@Test
56+
public void shouldAskConnectionsQueueToDisposeNotReusableConnections()
57+
{
58+
BlockingPooledConnectionQueue queue = newConnectionQueue();
59+
PooledConnection connection = acquireConnection( queue );
60+
61+
PooledConnectionValidator validator = newConnectionValidator( false );
62+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );
63+
64+
releaseConsumer.accept( connection );
65+
66+
// connection should've been disposed
67+
assertEquals( 0, queue.activeConnections() );
68+
assertEquals( 0, queue.size() );
69+
70+
verify( connection ).dispose();
71+
}
72+
73+
private static BlockingPooledConnectionQueue newConnectionQueue()
74+
{
75+
return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, 5, DEV_NULL_LOGGING );
76+
}
77+
78+
@SuppressWarnings( "unchecked" )
79+
private static PooledConnection acquireConnection( BlockingPooledConnectionQueue queue )
80+
{
81+
queue.offer( newConnectionMock() );
82+
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
83+
assertEquals( 1, queue.activeConnections() );
84+
return connection;
85+
}
86+
87+
private static PooledConnectionValidator newConnectionValidator( boolean allowsConnections )
88+
{
89+
ConnectionPool pool = mock( ConnectionPool.class );
90+
when( pool.hasAddress( LOCAL_DEFAULT ) ).thenReturn( allowsConnections );
91+
return new PooledConnectionValidator( pool );
92+
}
93+
94+
private static PooledConnection newConnectionMock()
95+
{
96+
PooledConnection connection = mock( PooledConnection.class );
97+
when( connection.boltServerAddress() ).thenReturn( LOCAL_DEFAULT );
98+
return connection;
99+
}
100+
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.mockito.stubbing.Answer;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
29+
import java.util.Map;
2830
import java.util.Set;
2931
import java.util.concurrent.Callable;
3032
import java.util.concurrent.ConcurrentHashMap;
@@ -42,11 +44,13 @@
4244
import org.neo4j.driver.internal.util.Clock;
4345
import org.neo4j.driver.internal.util.FakeClock;
4446
import org.neo4j.driver.v1.Logging;
47+
import org.neo4j.driver.v1.Value;
4548

4649
import static java.util.Collections.newSetFromMap;
4750
import static org.hamcrest.Matchers.instanceOf;
4851
import static org.hamcrest.Matchers.isOneOf;
4952
import static org.hamcrest.Matchers.not;
53+
import static org.junit.Assert.assertEquals;
5054
import static org.junit.Assert.assertFalse;
5155
import static org.junit.Assert.assertNotNull;
5256
import static org.junit.Assert.assertSame;
@@ -56,6 +60,7 @@
5660
import static org.mockito.Matchers.any;
5761
import static org.mockito.Mockito.RETURNS_MOCKS;
5862
import static org.mockito.Mockito.doNothing;
63+
import static org.mockito.Mockito.doThrow;
5964
import static org.mockito.Mockito.inOrder;
6065
import static org.mockito.Mockito.mock;
6166
import static org.mockito.Mockito.never;
@@ -64,6 +69,7 @@
6469
import static org.mockito.Mockito.when;
6570
import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT;
6671
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
72+
import static org.neo4j.driver.v1.Values.value;
6773

6874
public class SocketConnectionPoolTest
6975
{
@@ -489,6 +495,98 @@ public void acquireRetriesUntilAConnectionIsCreated()
489495
inOrder.verify( connection4, never() ).sync();
490496
}
491497

498+
@Test
499+
public void reportActiveConnectionsWhenEmpty()
500+
{
501+
SocketConnectionPool pool = newPool( newMockConnector() );
502+
503+
int activeConnections1 = pool.activeConnections( ADDRESS_1 );
504+
int activeConnections2 = pool.activeConnections( ADDRESS_2 );
505+
int activeConnections3 = pool.activeConnections( ADDRESS_3 );
506+
507+
assertEquals( 0, activeConnections1 );
508+
assertEquals( 0, activeConnections2 );
509+
assertEquals( 0, activeConnections3 );
510+
}
511+
512+
@Test
513+
public void reportActiveConnectionsWhenHasAcquiredConnections()
514+
{
515+
int acquiredConnections = 23;
516+
SocketConnectionPool pool = newPool( newMockConnector() );
517+
518+
for ( int i = 0; i < acquiredConnections; i++ )
519+
{
520+
assertNotNull( pool.acquire( ADDRESS_1 ) );
521+
}
522+
523+
assertEquals( acquiredConnections, pool.activeConnections( ADDRESS_1 ) );
524+
}
525+
526+
@Test
527+
public void reportActiveConnectionsWhenHasIdleConnections()
528+
{
529+
Connection connection = newConnectionMock( ADDRESS_1 );
530+
Connector connector = newMockConnector( connection );
531+
SocketConnectionPool pool = newPool( connector );
532+
533+
PooledConnection connection1 = pool.acquire( ADDRESS_1 );
534+
PooledConnection connection2 = pool.acquire( ADDRESS_1 );
535+
536+
assertEquals( 2, pool.activeConnections( ADDRESS_1 ) );
537+
538+
connection1.close();
539+
connection2.close();
540+
541+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
542+
}
543+
544+
@Test
545+
public void shouldForgetBrokenIdleConnection()
546+
{
547+
Connection connection1 = newConnectionMock( ADDRESS_1 );
548+
Connection connection2 = newConnectionMock( ADDRESS_1 );
549+
550+
doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset();
551+
552+
int idleTimeBeforeConnectionTest = 42;
553+
FakeClock clock = new FakeClock();
554+
Connector connector = newMockConnector( connection1, connection2 );
555+
SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest );
556+
557+
// acquire and release one connection
558+
pool.acquire( ADDRESS_1 ).close();
559+
// make this connection seem idle for too long
560+
clock.progress( idleTimeBeforeConnectionTest + 42 );
561+
562+
PooledConnection acquiredConnection = pool.acquire( ADDRESS_1 );
563+
564+
Map<String,Value> auth = Collections.singletonMap( "Key", value( "Value" ) );
565+
acquiredConnection.init( "DummyClient", auth );
566+
verify( connection1, never() ).init( "DummyClient", auth );
567+
verify( connection2 ).init( "DummyClient", auth );
568+
569+
assertEquals( 1, pool.activeConnections( ADDRESS_1 ) );
570+
acquiredConnection.close();
571+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
572+
}
573+
574+
@Test
575+
public void shouldForgetIdleConnection()
576+
{
577+
Connection connection = newConnectionMock( ADDRESS_1 );
578+
doThrow( new RuntimeException() ).when( connection ).reset();
579+
580+
SocketConnectionPool pool = newPool( newMockConnector( connection ), new FakeClock(), 42 );
581+
PooledConnection pooledConnection = pool.acquire( ADDRESS_1 );
582+
583+
// release the connection, it should fail to reset and be disposed
584+
pooledConnection.close();
585+
586+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
587+
verify( connection ).close();
588+
}
589+
492590
private static Answer<Connection> createConnectionAnswer( final Set<Connection> createdConnections )
493591
{
494592
return new Answer<Connection>()

0 commit comments

Comments
 (0)