Skip to content

Commit e9d44ff

Browse files
committed
Properly close "forgotten connections"
Whenever we do rediscovery and the server forces us to discard all connections to endpoints no longer valid in the servers point-of-view, there is a chance there are still open connections to those endpoints and when closing these open sessions we must make sure we properly close these connections without putting them back to the connection pool.
1 parent 2a27bf2 commit e9d44ff

File tree

9 files changed

+279
-95
lines changed

9 files changed

+279
-95
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,11 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.HashMap;
22-
import java.util.Map;
2321
import java.util.concurrent.BlockingQueue;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523

26-
import org.neo4j.driver.internal.spi.Collector;
2724
import org.neo4j.driver.internal.util.Consumer;
28-
import org.neo4j.driver.v1.Value;
25+
import org.neo4j.driver.v1.util.Function;
2926

3027
/**
3128
* The responsibility of the PooledConnectionReleaseConsumer is to release valid connections
@@ -34,16 +31,15 @@
3431
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3532
{
3633
private final BlockingQueue<PooledConnection> connections;
37-
private final long minIdleBeforeConnectionTest;
38-
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
3934
private final AtomicBoolean driverStopped;
35+
private final Function<PooledConnection, Boolean> validConnection;
4036

4137
PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, AtomicBoolean driverStopped,
42-
PoolSettings poolSettings)
38+
Function<PooledConnection, Boolean> validConnection)
4339
{
4440
this.connections = connections;
4541
this.driverStopped = driverStopped;
46-
this.minIdleBeforeConnectionTest = poolSettings.idleTimeBeforeConnectionTest();
42+
this.validConnection = validConnection;
4743
}
4844

4945
@Override
@@ -54,7 +50,7 @@ public void accept( PooledConnection pooledConnection )
5450
// if the driver already closed, then no need to try to return to pool, just directly close this connection
5551
pooledConnection.dispose();
5652
}
57-
else if ( validConnection( pooledConnection ) )
53+
else if ( validConnection.apply( pooledConnection ) )
5854
{
5955
boolean released = connections.offer( pooledConnection );
6056
if( !released )
@@ -83,48 +79,4 @@ else if ( driverStopped.get() )
8379
pooledConnection.dispose();
8480
}
8581
}
86-
87-
boolean validConnection( PooledConnection pooledConnection )
88-
{
89-
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
90-
// and we should close the conn without bothering to reset the conn at all
91-
return !pooledConnection.hasUnrecoverableErrors() &&
92-
reset(pooledConnection) &&
93-
(pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection ));
94-
}
95-
96-
/**
97-
* In case this session has an open result or transaction or something,
98-
* make sure it's reset to a nice state before we reuse it.
99-
* @param conn the PooledConnection
100-
* @return true if the connection is reset successfully without any error, otherwise false.
101-
*/
102-
private boolean reset( PooledConnection conn )
103-
{
104-
try
105-
{
106-
conn.reset();
107-
conn.sync();
108-
return true;
109-
}
110-
catch ( Throwable e )
111-
{
112-
return false;
113-
}
114-
}
115-
116-
private boolean ping( PooledConnection conn )
117-
{
118-
try
119-
{
120-
conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP );
121-
conn.pullAll( Collector.NO_OP );
122-
conn.sync();
123-
return true;
124-
}
125-
catch ( Throwable e )
126-
{
127-
return false;
128-
}
129-
}
13082
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import org.neo4j.driver.internal.spi.Collector;
25+
import org.neo4j.driver.internal.spi.ConnectionPool;
26+
import org.neo4j.driver.v1.Value;
27+
import org.neo4j.driver.v1.util.Function;
28+
29+
class PooledConnectionValidator implements Function<PooledConnection,Boolean>
30+
{
31+
private final ConnectionPool pool;
32+
private final PoolSettings poolSettings;
33+
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
34+
35+
PooledConnectionValidator( ConnectionPool pool, PoolSettings poolSettings )
36+
{
37+
this.pool = pool;
38+
this.poolSettings = poolSettings;
39+
}
40+
41+
@Override
42+
public Boolean apply( PooledConnection pooledConnection )
43+
{
44+
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
45+
// and we should close the conn without bothering to reset the conn at all
46+
return pool.hasAddress( pooledConnection.address() ) &&
47+
!pooledConnection.hasUnrecoverableErrors() &&
48+
reset( pooledConnection ) &&
49+
(pooledConnection.idleTime() <= poolSettings.idleTimeBeforeConnectionTest() ||
50+
ping( pooledConnection ));
51+
}
52+
53+
/**
54+
* In case this session has an open result or transaction or something,
55+
* make sure it's reset to a nice state before we reuse it.
56+
*
57+
* @param conn the PooledConnection
58+
* @return true if the connection is reset successfully without any error, otherwise false.
59+
*/
60+
private boolean reset( PooledConnection conn )
61+
{
62+
try
63+
{
64+
conn.reset();
65+
conn.sync();
66+
return true;
67+
}
68+
catch ( Throwable e )
69+
{
70+
return false;
71+
}
72+
}
73+
74+
private boolean ping( PooledConnection conn )
75+
{
76+
try
77+
{
78+
conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP );
79+
conn.pullAll( Collector.NO_OP );
80+
conn.sync();
81+
return true;
82+
}
83+
catch ( Throwable e )
84+
{
85+
return false;
86+
}
87+
}
88+
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public Connection acquire( BoltServerAddress address )
113113
if ( conn == null )
114114
{
115115
conn = new PooledConnection( connect( address ), new
116-
PooledConnectionReleaseConsumer( connections, stopped, poolSettings ), clock );
116+
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
117117
}
118118
conn.updateUsageTimestamp();
119119
return conn;

driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import org.neo4j.driver.internal.logging.ConsoleLogging;
3838
import org.neo4j.driver.internal.net.BoltServerAddress;
39+
import org.neo4j.driver.internal.spi.Connection;
3940
import org.neo4j.driver.v1.AccessMode;
4041
import org.neo4j.driver.v1.Config;
4142
import org.neo4j.driver.v1.GraphDatabase;
@@ -46,6 +47,7 @@
4647
import org.neo4j.driver.v1.util.Function;
4748
import org.neo4j.driver.v1.util.StubServer;
4849

50+
import static org.hamcrest.Matchers.contains;
4951
import static org.hamcrest.Matchers.containsInAnyOrder;
5052
import static org.hamcrest.Matchers.hasItem;
5153
import static org.hamcrest.Matchers.hasSize;
@@ -338,7 +340,7 @@ public void shouldRediscoverIfNecessaryOnSessionAcquisition()
338340

339341
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
340342
//START a read server
341-
StubServer.start( resource( "read_server.script" ), 9005 );
343+
StubServer read = StubServer.start( resource( "empty.script" ), 9005 );
342344

343345
//On creation we only find ourselves
344346
ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
@@ -356,6 +358,7 @@ public void shouldRediscoverIfNecessaryOnSessionAcquisition()
356358

357359
// Finally
358360
assertThat( server.exitStatus(), equalTo( 0 ) );
361+
assertThat( read.exitStatus(), equalTo( 0 ) );
359362
}
360363

361364
@Test
@@ -366,7 +369,7 @@ public void shouldOnlyGetServersOnce() throws IOException, InterruptedException,
366369

367370
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
368371
//START a read server
369-
StubServer.start( resource( "read_server.script" ), 9005 );
372+
StubServer read = StubServer.start( resource( "empty.script" ), 9005 );
370373

371374
//On creation we only find ourselves
372375
final ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
@@ -397,6 +400,7 @@ public void run()
397400

398401
// Finally
399402
assertThat( server.exitStatus(), equalTo( 0 ) );
403+
assertThat( read.exitStatus(), equalTo( 0 ) );
400404
}
401405

402406
@Test
@@ -455,6 +459,90 @@ public void shouldHandleLeaderSwitchWhenWriting()
455459
assertThat( server.exitStatus(), equalTo( 0 ) );
456460
}
457461

462+
@Test
463+
public void shouldRediscoverOnExpiry() throws IOException, InterruptedException, StubServer.ForceKilled
464+
{
465+
// Given
466+
StubServer server = StubServer.start( resource( "expire.script" ), 9001 );
467+
468+
//START a read server
469+
StubServer readServer = StubServer.start( resource( "empty.script" ), 9005 );
470+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
471+
ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
472+
assertThat(driver.routingServers(), contains(address( 9001 )));
473+
assertThat(driver.readServers(), contains(address( 9002 )));
474+
assertThat(driver.writeServers(), contains(address( 9003 )));
475+
476+
//On acquisition we should update our view
477+
Session session = driver.session( AccessMode.READ );
478+
assertThat(driver.routingServers(), contains(address( 9004 )));
479+
assertThat(driver.readServers(), contains(address( 9005 )));
480+
assertThat(driver.writeServers(), contains(address( 9006 )));
481+
session.close();
482+
driver.close();
483+
// Finally
484+
assertThat( server.exitStatus(), equalTo( 0 ) );
485+
assertThat( readServer.exitStatus(), equalTo( 0 ) );
486+
}
487+
488+
@Test
489+
public void shouldNotPutBackPurgedConnection() throws IOException, InterruptedException, StubServer.ForceKilled
490+
{
491+
// Given
492+
StubServer server = StubServer.start( resource( "not_reuse_connection.script" ), 9001 );
493+
494+
//START servers
495+
StubServer readServer = StubServer.start( resource( "empty.script" ), 9002 );
496+
StubServer writeServer1 = StubServer.start( resource( "dead_server.script" ), 9003 );
497+
StubServer writeServer2 = StubServer.start( resource( "empty.script" ), 9006 );
498+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
499+
500+
ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
501+
502+
503+
//Open both a read and a write session
504+
Session readSession = driver.session( AccessMode.READ );
505+
Session writeSession = driver.session( AccessMode.WRITE );
506+
507+
try
508+
{
509+
writeSession.run( "MATCH (n) RETURN n.name" );
510+
writeSession.close();
511+
fail();
512+
}
513+
catch (SessionExpiredException e)
514+
{
515+
//ignore
516+
}
517+
//We now lost all write servers
518+
assertThat(driver.writeServers(), hasSize( 0 ));
519+
520+
//reacquiring will trow out the current read server at 9002
521+
writeSession = driver.session( AccessMode.WRITE );
522+
523+
assertThat(driver.routingServers(), contains(address( 9004 )));
524+
assertThat(driver.readServers(), contains(address( 9005 )));
525+
assertThat(driver.writeServers(), contains(address( 9006 )));
526+
assertFalse(driver.connectionPool().hasAddress(address( 9002 ) ));
527+
528+
// now we close the read session and the connection should not be put
529+
// back to the pool
530+
Connection connection = ((ClusteredNetworkSession) readSession).connection;
531+
assertTrue( connection.isOpen() );
532+
readSession.close();
533+
assertFalse( connection.isOpen() );
534+
assertFalse(driver.connectionPool().hasAddress(address( 9002 ) ));
535+
writeSession.close();
536+
537+
driver.close();
538+
539+
// Finally
540+
assertThat( server.exitStatus(), equalTo( 0 ) );
541+
assertThat( readServer.exitStatus(), equalTo( 0 ) );
542+
assertThat( writeServer1.exitStatus(), equalTo( 0 ) );
543+
assertThat( writeServer2.exitStatus(), equalTo( 0 ) );
544+
}
545+
458546
String resource( String fileName )
459547
{
460548
URL resource = ClusterDriverStubTest.class.getClassLoader().getResource( fileName );

0 commit comments

Comments
 (0)