Skip to content

Commit 3004db0

Browse files
committed
Make sure connection is only called once.
Instead of guarding in `PooledConnection` as well as in `NetworkSession` we make sure we only close connection once in `RoutingDriver`
1 parent 5540e80 commit 3004db0

File tree

6 files changed

+45
-49
lines changed

6 files changed

+45
-49
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ public BoltServerAddress apply( Value value )
241241
//must be called from a synchronized method
242242
private boolean call( BoltServerAddress address, String procedureName, Consumer<Record> recorder )
243243
{
244-
Connection acquire = null;
244+
Connection acquire;
245245
Session session = null;
246246
try
247247
{
@@ -271,11 +271,6 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
271271
{
272272
session.close();
273273
}
274-
if ( acquire != null )
275-
{
276-
acquire.close();
277-
}
278-
279274
}
280275
return true;
281276
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.net.pooling;
2020

2121
import java.util.Map;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2322

2423
import org.neo4j.driver.internal.net.BoltServerAddress;
2524
import org.neo4j.driver.internal.spi.Collector;
@@ -59,7 +58,6 @@ public class PooledConnection implements Connection
5958
private Runnable onError = null;
6059
private final Clock clock;
6160
private long lastUsed;
62-
private final AtomicBoolean released = new AtomicBoolean( false );
6361

6462
public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
6563
{
@@ -69,9 +67,8 @@ public PooledConnection( Connection delegate, Consumer<PooledConnection> release
6967
this.lastUsed = clock.millis();
7068
}
7169

72-
public void setInUse()
70+
public void updateTimestamp()
7371
{
74-
released.set(false);
7572
lastUsed = clock.millis();
7673
}
7774

@@ -200,10 +197,7 @@ public void receiveOne()
200197
*/
201198
public void close()
202199
{
203-
if ( released.compareAndSet( false, true ))
204-
{
205-
release.accept( this );
206-
}
200+
release.accept( this );
207201
// put the full logic of deciding whether to dispose the connection or to put it back to
208202
// the pool into the release object
209203
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.BlockingQueue;
2325
import java.util.concurrent.ConcurrentHashMap;
@@ -39,6 +41,8 @@
3941
import org.neo4j.driver.v1.Value;
4042
import org.neo4j.driver.v1.exceptions.ClientException;
4143

44+
import static java.util.Collections.emptyList;
45+
4246
/**
4347
* The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first
4448
* try to return the session into the session pool, however if we failed to return it back, either because the pool
@@ -115,7 +119,7 @@ public Connection acquire( BoltServerAddress address )
115119
conn = new PooledConnection( connect( address ), new
116120
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
117121
}
118-
conn.setInUse();
122+
conn.updateTimestamp();
119123
return conn;
120124
}
121125

@@ -184,4 +188,19 @@ public void close()
184188
pools.clear();
185189
}
186190

191+
//for testing
192+
public List<PooledConnection> connectionsForAddress(BoltServerAddress address)
193+
{
194+
LinkedBlockingQueue<PooledConnection> pooledConnections =
195+
(LinkedBlockingQueue<PooledConnection>) pools.get( address );
196+
if (pooledConnections == null)
197+
{
198+
return emptyList();
199+
}
200+
else
201+
{
202+
return new ArrayList<>( pooledConnections );
203+
}
204+
}
205+
187206
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535

3636
import org.neo4j.driver.internal.logging.ConsoleLogging;
3737
import org.neo4j.driver.internal.net.BoltServerAddress;
38+
import org.neo4j.driver.internal.net.pooling.PooledConnection;
39+
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
3840
import org.neo4j.driver.internal.spi.Connection;
3941
import org.neo4j.driver.v1.AccessMode;
4042
import org.neo4j.driver.v1.Config;
@@ -86,6 +88,26 @@ public void shouldDiscoverServers() throws IOException, InterruptedException, St
8688
assertThat( server.exitStatus(), equalTo( 0 ) );
8789
}
8890

91+
@Test
92+
public void shouldOnlyPutConnectionInPoolOnce() throws IOException, InterruptedException, StubServer.ForceKilled
93+
{
94+
// Given
95+
StubServer server = StubServer.start( "discover_servers.script", 9001 );
96+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
97+
98+
// When
99+
try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) )
100+
{
101+
// Then
102+
SocketConnectionPool pool = (SocketConnectionPool) driver.connectionPool();
103+
List<PooledConnection> pooledConnections = pool.connectionsForAddress( address( 9001 ) );
104+
assertThat(pooledConnections, hasSize( 1 ));
105+
}
106+
107+
// Finally
108+
assertThat( server.exitStatus(), equalTo( 0 ) );
109+
}
110+
89111
@Test
90112
public void shouldDiscoverNewServers() throws IOException, InterruptedException, StubServer.ForceKilled
91113
{

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
4444
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4545
import org.neo4j.driver.v1.summary.ResultSummary;
46-
import org.neo4j.driver.v1.util.BiFunction;
4746
import org.neo4j.driver.v1.util.Function;
4847

4948
import static java.util.Arrays.asList;

driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -120,39 +120,6 @@ public void dispose()
120120
assertThat( flags[0], equalTo( false ) );
121121
}
122122

123-
@Test
124-
public void shouldOnlyReturnOnceEventhougCloseIsBeingCalledMultipleTimes() throws Throwable
125-
{
126-
// Given
127-
final BlockingQueue<PooledConnection> pool = new LinkedBlockingQueue<>(2);
128-
129-
final boolean[] flags = {false};
130-
131-
Connection conn = mock( Connection.class );
132-
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
133-
new AtomicBoolean( false ), VALID_CONNECTION );
134-
135-
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
136-
{
137-
@Override
138-
public void dispose()
139-
{
140-
flags[0] = true;
141-
}
142-
};
143-
144-
// When
145-
pooledConnection.close();
146-
pooledConnection.close();
147-
pooledConnection.close();
148-
pooledConnection.close();
149-
pooledConnection.close();
150-
151-
// Then
152-
assertThat( pool, hasItem(pooledConnection) );
153-
assertThat( pool.size(), equalTo( 1 ) );
154-
assertThat( flags[0], equalTo( false ) );
155-
}
156123

157124
@Test
158125
public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable

0 commit comments

Comments
 (0)