Skip to content

Commit 790e0dc

Browse files
authored
Improve connection release handling and improve flaky test (#1092)
This update ensures connection release stages are linked. In addition, it improves stability of flaky `RoutingTableAndConnectionPoolTest.shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool` test.
1 parent ef1bce7 commit 790e0dc

File tree

3 files changed

+36
-18
lines changed

3 files changed

+36
-18
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import static java.util.Collections.emptyMap;
4949
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
5050
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
51+
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
5152

5253
/**
5354
* This connection represents a simple network connection to a remote server. It wraps a channel obtained from a connection pool. The life cycle of this
@@ -189,10 +190,14 @@ public void terminateAndRelease( String reason )
189190
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
190191
{
191192
setTerminationReason( channel, reason );
192-
channel.close();
193-
channelPool.release( channel );
194-
releaseFuture.complete( null );
195-
metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent );
193+
asCompletionStage( channel.close() )
194+
.exceptionally( throwable -> null )
195+
.thenCompose( ignored -> channelPool.release( channel ) )
196+
.whenComplete( ( ignored, throwable ) ->
197+
{
198+
releaseFuture.complete( null );
199+
metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent );
200+
} );
196201
}
197202
}
198203

driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.neo4j.driver.internal.util.Clock;
2929

3030
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
31+
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
32+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
3133

3234
public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
3335
{
@@ -47,18 +49,20 @@ public ChannelReleasingResetResponseHandler( Channel channel, ExtendedChannelPoo
4749
@Override
4850
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
4951
{
52+
CompletionStage<Void> closureStage;
5053
if ( success )
5154
{
5255
// update the last-used timestamp before returning the channel back to the pool
5356
setLastUsedTimestamp( channel, clock.millis() );
57+
closureStage = completedWithNull();
5458
}
5559
else
5660
{
5761
// close the channel before returning it back to the pool if RESET failed
58-
channel.close();
62+
closureStage = asCompletionStage( channel.close() );
5963
}
60-
61-
CompletionStage<Void> released = pool.release( channel );
62-
released.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) );
64+
closureStage.exceptionally( throwable -> null )
65+
.thenCompose( ignored -> pool.release( channel ) )
66+
.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) );
6367
}
6468
}

driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import java.time.Duration;
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
28+
import java.util.Collections;
2829
import java.util.HashSet;
2930
import java.util.LinkedList;
3031
import java.util.List;
32+
import java.util.Objects;
3133
import java.util.Random;
3234
import java.util.Set;
3335
import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,7 @@
4345
import org.neo4j.driver.exceptions.FatalDiscoveryException;
4446
import org.neo4j.driver.exceptions.ProtocolException;
4547
import org.neo4j.driver.internal.BoltServerAddress;
48+
import org.neo4j.driver.internal.DatabaseNameUtil;
4649
import org.neo4j.driver.internal.async.connection.BootstrapFactory;
4750
import org.neo4j.driver.internal.async.pool.NettyChannelHealthChecker;
4851
import org.neo4j.driver.internal.async.pool.NettyChannelTracker;
@@ -85,7 +88,7 @@ class RoutingTableAndConnectionPoolTest
8588
private static final BoltServerAddress D = new BoltServerAddress( "localhost:30003" );
8689
private static final BoltServerAddress E = new BoltServerAddress( "localhost:30004" );
8790
private static final BoltServerAddress F = new BoltServerAddress( "localhost:30005" );
88-
private static final List<BoltServerAddress> SERVERS = new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) );
91+
private static final List<BoltServerAddress> SERVERS = Collections.synchronizedList( new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ) );
8992

9093
private static final String[] DATABASES = new String[]{"", SYSTEM_DATABASE_NAME, "my database"};
9194

@@ -94,7 +97,7 @@ class RoutingTableAndConnectionPoolTest
9497
private final Logging logging = none();
9598

9699
@Test
97-
void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable
100+
void shouldAddServerToRoutingTableAndConnectionPool()
98101
{
99102
// Given
100103
ConnectionPool connectionPool = newConnectionPool();
@@ -114,7 +117,7 @@ void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable
114117
}
115118

116119
@Test
117-
void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable
120+
void shouldNotAddToRoutingTableWhenFailedWithRoutingError()
118121
{
119122
// Given
120123
ConnectionPool connectionPool = newConnectionPool();
@@ -134,7 +137,7 @@ void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable
134137
}
135138

136139
@Test
137-
void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable
140+
void shouldNotAddToRoutingTableWhenFailedWithProtocolError()
138141
{
139142
// Given
140143
ConnectionPool connectionPool = newConnectionPool();
@@ -154,7 +157,7 @@ void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable
154157
}
155158

156159
@Test
157-
void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable
160+
void shouldNotAddToRoutingTableWhenFailedWithSecurityError()
158161
{
159162
// Given
160163
ConnectionPool connectionPool = newConnectionPool();
@@ -174,7 +177,7 @@ void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable
174177
}
175178

176179
@Test
177-
void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable
180+
void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired()
178181
{
179182
// Given
180183
ConnectionPool connectionPool = newConnectionPool();
@@ -197,7 +200,7 @@ void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable
197200
}
198201

199202
@Test
200-
void shouldRemoveExpiredRoutingTableAndServers() throws Throwable
203+
void shouldRemoveExpiredRoutingTableAndServers()
201204
{
202205
// Given
203206
ConnectionPool connectionPool = newConnectionPool();
@@ -224,7 +227,7 @@ void shouldRemoveExpiredRoutingTableAndServers() throws Throwable
224227
}
225228

226229
@Test
227-
void shouldRemoveExpiredRoutingTableButNotServer() throws Throwable
230+
void shouldRemoveExpiredRoutingTableButNotServer()
228231
{
229232
// Given
230233
ConnectionPool connectionPool = newConnectionPool();
@@ -263,7 +266,7 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl
263266
acquireAndReleaseConnections( loadBalancer );
264267
Set<BoltServerAddress> servers = routingTables.allServers();
265268
BoltServerAddress openServer = null;
266-
for( BoltServerAddress server: servers )
269+
for ( BoltServerAddress server : servers )
267270
{
268271
if ( connectionPool.isOpen( server ) )
269272
{
@@ -275,6 +278,8 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl
275278

276279
// if we remove the open server from servers, then the connection pool should remove the server from the pool.
277280
SERVERS.remove( openServer );
281+
// ensure rediscovery is necessary on subsequent interaction
282+
Arrays.stream( DATABASES ).map( DatabaseNameUtil::database ).forEach( routingTables::remove );
278283
acquireAndReleaseConnections( loadBalancer );
279284

280285
assertFalse( connectionPool.isOpen( openServer ) );
@@ -375,7 +380,11 @@ public CompletionStage<ClusterCompositionLookupResult> lookupClusterComposition(
375380
}
376381
if ( servers.size() == 0 )
377382
{
378-
servers.add( A );
383+
BoltServerAddress address = SERVERS.stream()
384+
.filter( Objects::nonNull )
385+
.findFirst()
386+
.orElseThrow( () -> new RuntimeException( "No non null server addresses are available" ) );
387+
servers.add( address );
379388
}
380389
ClusterComposition composition = new ClusterComposition( clock.millis() + 1, servers, servers, servers, null );
381390
return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) );

0 commit comments

Comments
 (0)