Skip to content

Commit e82457a

Browse files
authored
Merge pull request #401 from zhenlineo/1.4-conn-leak
Backport commit from 1.5 to 1.4: Improve disposal of broken connections
2 parents 44e6a3f + b2f452e commit e82457a

File tree

6 files changed

+281
-4
lines changed

6 files changed

+281
-4
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,15 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
105105
return connection;
106106
}
107107

108-
public List<PooledConnection> toList()
108+
public int activeConnections()
109109
{
110-
return new ArrayList<>( queue );
110+
return acquiredConnections.size();
111+
}
112+
113+
void disposeBroken( PooledConnection connection )
114+
{
115+
acquiredConnections.remove( connection );
116+
disposeSafely( connection );
111117
}
112118

113119
public boolean isEmpty()

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void accept( PooledConnection pooledConnection )
4747
}
4848
else
4949
{
50-
pooledConnection.dispose();
50+
connections.disposeBroken( pooledConnection );
5151
}
5252
}
5353
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ public void close()
109109
}
110110
}
111111

112+
public int activeConnections( BoltServerAddress address )
113+
{
114+
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
115+
return connectionQueue == null ? 0 : connectionQueue.activeConnections();
116+
}
117+
112118
private BlockingPooledConnectionQueue pool( BoltServerAddress address )
113119
{
114120
BlockingPooledConnectionQueue pool = pools.get( address );
@@ -130,10 +136,16 @@ private PooledConnection acquireConnection( BoltServerAddress address,
130136
{
131137
ConnectionSupplier connectionSupplier = new ConnectionSupplier( connectionQueue, address );
132138

133-
PooledConnection connection;
139+
PooledConnection connection = null;
134140
boolean connectionCreated;
135141
do
136142
{
143+
// dispose previous connection that can't be acquired
144+
if ( connection != null )
145+
{
146+
connectionQueue.disposeBroken( connection );
147+
}
148+
137149
connection = connectionQueue.acquire( connectionSupplier );
138150
connectionCreated = connectionSupplier.connectionCreated();
139151
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueueTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,67 @@ public void shouldTerminateBothAcquiredAndIdleConnections()
239239
verify( connection4 ).dispose();
240240
}
241241

242+
@Test
243+
public void shouldReportZeroActiveConnectionsWhenEmpty()
244+
{
245+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
246+
247+
assertEquals( 0, queue.activeConnections() );
248+
}
249+
250+
@Test
251+
public void shouldReportZeroActiveConnectionsWhenHasOnlyIdleConnections()
252+
{
253+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
254+
255+
queue.offer( mock( PooledConnection.class ) );
256+
queue.offer( mock( PooledConnection.class ) );
257+
258+
assertEquals( 0, queue.activeConnections() );
259+
}
260+
261+
@Test
262+
@SuppressWarnings( "unchecked" )
263+
public void shouldReportActiveConnections()
264+
{
265+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
266+
267+
PooledConnection connection1 = mock( PooledConnection.class );
268+
PooledConnection connection2 = mock( PooledConnection.class );
269+
PooledConnection connection3 = mock( PooledConnection.class );
270+
271+
queue.offer( connection1 );
272+
queue.offer( connection2 );
273+
queue.offer( connection3 );
274+
275+
queue.acquire( mock( Supplier.class ) );
276+
queue.acquire( mock( Supplier.class ) );
277+
queue.acquire( mock( Supplier.class ) );
278+
279+
assertEquals( 3, queue.activeConnections() );
280+
281+
queue.offer( connection1 );
282+
queue.offer( connection2 );
283+
queue.offer( connection3 );
284+
285+
assertEquals( 0, queue.activeConnections() );
286+
}
287+
288+
@Test
289+
@SuppressWarnings( "unchecked" )
290+
public void shouldDisposeBrokenConnections()
291+
{
292+
BlockingPooledConnectionQueue queue = newConnectionQueue( 5 );
293+
294+
queue.offer( mock( PooledConnection.class ) );
295+
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
296+
assertEquals( 1, queue.activeConnections() );
297+
298+
queue.disposeBroken( connection );
299+
assertEquals( 0, queue.activeConnections() );
300+
verify( connection ).dispose();
301+
}
302+
242303
private static BlockingPooledConnectionQueue newConnectionQueue( int capacity )
243304
{
244305
return newConnectionQueue( capacity, mock( Logging.class, RETURNS_MOCKS ) );
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import org.junit.Test;
22+
23+
import org.neo4j.driver.internal.spi.ConnectionPool;
24+
import org.neo4j.driver.internal.spi.PooledConnection;
25+
import org.neo4j.driver.internal.util.Supplier;
26+
27+
import static org.junit.Assert.assertEquals;
28+
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.verify;
30+
import static org.mockito.Mockito.when;
31+
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
32+
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
33+
34+
public class PooledConnectionReleaseConsumerTest
35+
{
36+
@Test
37+
public void shouldOfferReusableConnectionsBackToTheConnectionsQueue()
38+
{
39+
BlockingPooledConnectionQueue queue = newConnectionQueue();
40+
PooledConnection connection = acquireConnection( queue );
41+
42+
PooledConnectionValidator validator = newConnectionValidator( true );
43+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );
44+
45+
releaseConsumer.accept( connection );
46+
47+
// connection should now be idle
48+
assertEquals( 0, queue.activeConnections() );
49+
assertEquals( 1, queue.size() );
50+
51+
verify( connection ).reset();
52+
verify( connection ).sync();
53+
}
54+
55+
@Test
56+
public void shouldAskConnectionsQueueToDisposeNotReusableConnections()
57+
{
58+
BlockingPooledConnectionQueue queue = newConnectionQueue();
59+
PooledConnection connection = acquireConnection( queue );
60+
61+
PooledConnectionValidator validator = newConnectionValidator( false );
62+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( queue, validator );
63+
64+
releaseConsumer.accept( connection );
65+
66+
// connection should've been disposed
67+
assertEquals( 0, queue.activeConnections() );
68+
assertEquals( 0, queue.size() );
69+
70+
verify( connection ).dispose();
71+
}
72+
73+
private static BlockingPooledConnectionQueue newConnectionQueue()
74+
{
75+
return new BlockingPooledConnectionQueue( LOCAL_DEFAULT, 5, DEV_NULL_LOGGING );
76+
}
77+
78+
@SuppressWarnings( "unchecked" )
79+
private static PooledConnection acquireConnection( BlockingPooledConnectionQueue queue )
80+
{
81+
queue.offer( newConnectionMock() );
82+
PooledConnection connection = queue.acquire( mock( Supplier.class ) );
83+
assertEquals( 1, queue.activeConnections() );
84+
return connection;
85+
}
86+
87+
private static PooledConnectionValidator newConnectionValidator( boolean allowsConnections )
88+
{
89+
ConnectionPool pool = mock( ConnectionPool.class );
90+
when( pool.hasAddress( LOCAL_DEFAULT ) ).thenReturn( allowsConnections );
91+
return new PooledConnectionValidator( pool );
92+
}
93+
94+
private static PooledConnection newConnectionMock()
95+
{
96+
PooledConnection connection = mock( PooledConnection.class );
97+
when( connection.boltServerAddress() ).thenReturn( LOCAL_DEFAULT );
98+
return connection;
99+
}
100+
}

driver/src/test/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPoolTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.mockito.stubbing.Answer;
2525

2626
import java.util.ArrayList;
27+
import java.util.Collections;
2728
import java.util.List;
29+
import java.util.Map;
2830
import java.util.Set;
2931
import java.util.concurrent.Callable;
3032
import java.util.concurrent.ConcurrentHashMap;
@@ -42,11 +44,13 @@
4244
import org.neo4j.driver.internal.util.Clock;
4345
import org.neo4j.driver.internal.util.FakeClock;
4446
import org.neo4j.driver.v1.Logging;
47+
import org.neo4j.driver.v1.Value;
4548

4649
import static java.util.Collections.newSetFromMap;
4750
import static org.hamcrest.Matchers.instanceOf;
4851
import static org.hamcrest.Matchers.isOneOf;
4952
import static org.hamcrest.Matchers.not;
53+
import static org.junit.Assert.assertEquals;
5054
import static org.junit.Assert.assertFalse;
5155
import static org.junit.Assert.assertNotNull;
5256
import static org.junit.Assert.assertSame;
@@ -56,6 +60,7 @@
5660
import static org.mockito.Matchers.any;
5761
import static org.mockito.Mockito.RETURNS_MOCKS;
5862
import static org.mockito.Mockito.doNothing;
63+
import static org.mockito.Mockito.doThrow;
5964
import static org.mockito.Mockito.inOrder;
6065
import static org.mockito.Mockito.mock;
6166
import static org.mockito.Mockito.never;
@@ -64,6 +69,7 @@
6469
import static org.mockito.Mockito.when;
6570
import static org.neo4j.driver.internal.net.BoltServerAddress.DEFAULT_PORT;
6671
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
72+
import static org.neo4j.driver.v1.Values.value;
6773

6874
public class SocketConnectionPoolTest
6975
{
@@ -489,6 +495,98 @@ public void acquireRetriesUntilAConnectionIsCreated()
489495
inOrder.verify( connection4, never() ).sync();
490496
}
491497

498+
@Test
499+
public void reportActiveConnectionsWhenEmpty()
500+
{
501+
SocketConnectionPool pool = newPool( newMockConnector() );
502+
503+
int activeConnections1 = pool.activeConnections( ADDRESS_1 );
504+
int activeConnections2 = pool.activeConnections( ADDRESS_2 );
505+
int activeConnections3 = pool.activeConnections( ADDRESS_3 );
506+
507+
assertEquals( 0, activeConnections1 );
508+
assertEquals( 0, activeConnections2 );
509+
assertEquals( 0, activeConnections3 );
510+
}
511+
512+
@Test
513+
public void reportActiveConnectionsWhenHasAcquiredConnections()
514+
{
515+
int acquiredConnections = 23;
516+
SocketConnectionPool pool = newPool( newMockConnector() );
517+
518+
for ( int i = 0; i < acquiredConnections; i++ )
519+
{
520+
assertNotNull( pool.acquire( ADDRESS_1 ) );
521+
}
522+
523+
assertEquals( acquiredConnections, pool.activeConnections( ADDRESS_1 ) );
524+
}
525+
526+
@Test
527+
public void reportActiveConnectionsWhenHasIdleConnections()
528+
{
529+
Connection connection = newConnectionMock( ADDRESS_1 );
530+
Connector connector = newMockConnector( connection );
531+
SocketConnectionPool pool = newPool( connector );
532+
533+
PooledConnection connection1 = pool.acquire( ADDRESS_1 );
534+
PooledConnection connection2 = pool.acquire( ADDRESS_1 );
535+
536+
assertEquals( 2, pool.activeConnections( ADDRESS_1 ) );
537+
538+
connection1.close();
539+
connection2.close();
540+
541+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
542+
}
543+
544+
@Test
545+
public void shouldForgetBrokenIdleConnection()
546+
{
547+
Connection connection1 = newConnectionMock( ADDRESS_1 );
548+
Connection connection2 = newConnectionMock( ADDRESS_1 );
549+
550+
doNothing().doThrow( new RuntimeException() ).when( connection1 ).reset();
551+
552+
int idleTimeBeforeConnectionTest = 42;
553+
FakeClock clock = new FakeClock();
554+
Connector connector = newMockConnector( connection1, connection2 );
555+
SocketConnectionPool pool = newPool( connector, clock, idleTimeBeforeConnectionTest );
556+
557+
// acquire and release one connection
558+
pool.acquire( ADDRESS_1 ).close();
559+
// make this connection seem idle for too long
560+
clock.progress( idleTimeBeforeConnectionTest + 42 );
561+
562+
PooledConnection acquiredConnection = pool.acquire( ADDRESS_1 );
563+
564+
Map<String,Value> auth = Collections.singletonMap( "Key", value( "Value" ) );
565+
acquiredConnection.init( "DummyClient", auth );
566+
verify( connection1, never() ).init( "DummyClient", auth );
567+
verify( connection2 ).init( "DummyClient", auth );
568+
569+
assertEquals( 1, pool.activeConnections( ADDRESS_1 ) );
570+
acquiredConnection.close();
571+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
572+
}
573+
574+
@Test
575+
public void shouldForgetIdleConnection()
576+
{
577+
Connection connection = newConnectionMock( ADDRESS_1 );
578+
doThrow( new RuntimeException() ).when( connection ).reset();
579+
580+
SocketConnectionPool pool = newPool( newMockConnector( connection ), new FakeClock(), 42 );
581+
PooledConnection pooledConnection = pool.acquire( ADDRESS_1 );
582+
583+
// release the connection, it should fail to reset and be disposed
584+
pooledConnection.close();
585+
586+
assertEquals( 0, pool.activeConnections( ADDRESS_1 ) );
587+
verify( connection ).close();
588+
}
589+
492590
private static Answer<Connection> createConnectionAnswer( final Set<Connection> createdConnections )
493591
{
494592
return new Answer<Connection>()

0 commit comments

Comments
 (0)