Skip to content

Commit cdc79ca

Browse files
committed
Don't add the same connection multiple times
The same connections was being added to the queue if close was called more than once. Eventually the queue will be full and we close the connection, but since the queue now contains multiple identical copies we will also close these.
1 parent d17b10a commit cdc79ca

File tree

3 files changed

+43
-3
lines changed

3 files changed

+43
-3
lines changed

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnection.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.net.pooling;
2020

2121
import java.util.Map;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.neo4j.driver.internal.net.BoltServerAddress;
2425
import org.neo4j.driver.internal.spi.Collector;
@@ -58,6 +59,7 @@ public class PooledConnection implements Connection
5859
private Runnable onError = null;
5960
private final Clock clock;
6061
private long lastUsed;
62+
private final AtomicBoolean released = new AtomicBoolean( false );
6163

6264
public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
6365
{
@@ -67,8 +69,9 @@ public PooledConnection( Connection delegate, Consumer<PooledConnection> release
6769
this.lastUsed = clock.millis();
6870
}
6971

70-
public void updateUsageTimestamp()
72+
public void setInUse()
7173
{
74+
released.set(false);
7275
lastUsed = clock.millis();
7376
}
7477

@@ -197,7 +200,10 @@ public void receiveOne()
197200
*/
198201
public void close()
199202
{
200-
release.accept( this );
203+
if ( released.compareAndSet( false, true ))
204+
{
205+
release.accept( this );
206+
}
201207
// put the full logic of deciding whether to dispose the connection or to put it back to
202208
// the pool into the release object
203209
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public Connection acquire( BoltServerAddress address )
115115
conn = new PooledConnection( connect( address ), new
116116
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
117117
}
118-
conn.updateUsageTimestamp();
118+
conn.setInUse();
119119
return conn;
120120
}
121121

driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,40 @@ public void dispose()
120120
assertThat( flags[0], equalTo( false ) );
121121
}
122122

123+
@Test
124+
public void shouldOnlyReturnOnceEventhougCloseIsBeingCalledMultipleTimes() throws Throwable
125+
{
126+
// Given
127+
final BlockingQueue<PooledConnection> pool = new LinkedBlockingQueue<>(2);
128+
129+
final boolean[] flags = {false};
130+
131+
Connection conn = mock( Connection.class );
132+
PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool,
133+
new AtomicBoolean( false ), VALID_CONNECTION );
134+
135+
PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM )
136+
{
137+
@Override
138+
public void dispose()
139+
{
140+
flags[0] = true;
141+
}
142+
};
143+
144+
// When
145+
pooledConnection.close();
146+
pooledConnection.close();
147+
pooledConnection.close();
148+
pooledConnection.close();
149+
pooledConnection.close();
150+
151+
// Then
152+
assertThat( pool, hasItem(pooledConnection) );
153+
assertThat( pool.size(), equalTo( 1 ) );
154+
assertThat( flags[0], equalTo( false ) );
155+
}
156+
123157
@Test
124158
public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws Throwable
125159
{

0 commit comments

Comments
 (0)