Skip to content

Commit ec5a329

Browse files
committed
Properly close "forgotten connections"
Whenever we do rediscovery and the server forces us to discard all connections to endpoints no longer valid in the servers point-of-view, there is a chance there are still open connections to those endpoints and when closing these open sessions we must make sure we properly close these connections without putting them back to the connection pool.
1 parent 2a27bf2 commit ec5a329

File tree

4 files changed

+118
-0
lines changed

4 files changed

+118
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.neo4j.driver.internal.net.pooling;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.neo4j.driver.internal.spi.Collector;
7+
import org.neo4j.driver.internal.spi.ConnectionPool;
8+
import org.neo4j.driver.v1.Value;
9+
import org.neo4j.driver.v1.util.Function;
10+
11+
/**
12+
* Created by pontusmelke on 22/09/16.
13+
*/
14+
class PooledConnectionValidator implements Function<PooledConnection,Boolean>
15+
{
16+
private final ConnectionPool pool;
17+
private final PoolSettings poolSettings;
18+
private static final Map<String,Value> NO_PARAMETERS = new HashMap<>();
19+
20+
PooledConnectionValidator( ConnectionPool pool, PoolSettings poolSettings )
21+
{
22+
this.pool = pool;
23+
this.poolSettings = poolSettings;
24+
}
25+
26+
@Override
27+
public Boolean apply( PooledConnection pooledConnection )
28+
{
29+
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
30+
// and we should close the conn without bothering to reset the conn at all
31+
return pool.hasAddress( pooledConnection.address() ) &&
32+
!pooledConnection.hasUnrecoverableErrors() &&
33+
reset( pooledConnection ) &&
34+
(pooledConnection.idleTime() <= poolSettings.idleTimeBeforeConnectionTest() ||
35+
ping( pooledConnection ));
36+
}
37+
38+
/**
39+
* In case this session has an open result or transaction or something,
40+
* make sure it's reset to a nice state before we reuse it.
41+
*
42+
* @param conn the PooledConnection
43+
* @return true if the connection is reset successfully without any error, otherwise false.
44+
*/
45+
private boolean reset( PooledConnection conn )
46+
{
47+
try
48+
{
49+
conn.reset();
50+
conn.sync();
51+
return true;
52+
}
53+
catch ( Throwable e )
54+
{
55+
return false;
56+
}
57+
}
58+
59+
private boolean ping( PooledConnection conn )
60+
{
61+
try
62+
{
63+
conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP );
64+
conn.pullAll( Collector.NO_OP );
65+
conn.sync();
66+
return true;
67+
}
68+
catch ( Throwable e )
69+
{
70+
return false;
71+
}
72+
}
73+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {}
4+
!: AUTO PULL_ALL
5+
6+
C: RUN "CALL dbms.cluster.routing.getServers" {}
7+
PULL_ALL
8+
S: SUCCESS {"fields": ["ttl", "servers"]}
9+
RECORD [0, [{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9003"], "role": "WRITE"}]]
10+
SUCCESS {}
11+
C: RUN "CALL dbms.cluster.routing.getServers" {}
12+
PULL_ALL
13+
S: SUCCESS {"fields": ["ttl", "servers"]}
14+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9006"], "role": "WRITE"}]]
15+
SUCCESS {}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {}
4+
!: AUTO PULL_ALL
5+
6+
C: RUN "CALL dbms.cluster.routing.getServers" {}
7+
PULL_ALL
8+
S: SUCCESS {"fields": ["ttl", "servers"]}
9+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
10+
SUCCESS {}
11+
C: RUN "CALL dbms.cluster.routing.getServers" {}
12+
PULL_ALL
13+
S: SUCCESS {"fields": ["ttl", "servers"]}
14+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9002","127.0.0.1:9003","127.0.0.1:9004"], "role": "ROUTE"}]]
15+
SUCCESS {}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
!: AUTO INIT
2+
!: AUTO RESET
3+
!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {}
4+
!: AUTO PULL_ALL
5+
6+
C: RUN "CALL dbms.cluster.routing.getServers" {}
7+
PULL_ALL
8+
S: SUCCESS {"fields": ["ttl", "servers"]}
9+
RECORD [0, [{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9003"], "role": "WRITE"}]]
10+
SUCCESS {}
11+
C: RUN "CALL dbms.cluster.routing.getServers" {}
12+
PULL_ALL
13+
S: SUCCESS {"fields": ["ttl", "servers"]}
14+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9006"], "role": "WRITE"}]]
15+
SUCCESS {}

0 commit comments

Comments
 (0)