Skip to content

Commit 4e3b75e

Browse files
committed
Improve connection release handling and improve flaky test (neo4j#1092) (neo4j#1140)
This update ensures connection release stages are linked. In addition, it improves stability of flaky `RoutingTableAndConnectionPoolTest.shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool` test.
1 parent 6cd2169 commit 4e3b75e

File tree

3 files changed

+36
-18
lines changed

3 files changed

+36
-18
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static java.util.Collections.emptyMap;
4444
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
4545
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setTerminationReason;
46+
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
4647

4748
/**
4849
* This connection represents a simple network connection to a remote server.
@@ -178,10 +179,14 @@ public void terminateAndRelease( String reason )
178179
if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) )
179180
{
180181
setTerminationReason( channel, reason );
181-
channel.close();
182-
channelPool.release( channel );
183-
releaseFuture.complete( null );
184-
metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent );
182+
asCompletionStage( channel.close() )
183+
.exceptionally( throwable -> null )
184+
.thenCompose( ignored -> channelPool.release( channel ) )
185+
.whenComplete( ( ignored, throwable ) ->
186+
{
187+
releaseFuture.complete( null );
188+
metricsListener.afterConnectionReleased( poolId( this.channel ), this.inUseEvent );
189+
} );
185190
}
186191
}
187192

driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.neo4j.driver.internal.util.Clock;
2929

3030
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setLastUsedTimestamp;
31+
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
32+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
3133

3234
public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
3335
{
@@ -47,18 +49,20 @@ public ChannelReleasingResetResponseHandler( Channel channel, ExtendedChannelPoo
4749
@Override
4850
protected void resetCompleted( CompletableFuture<Void> completionFuture, boolean success )
4951
{
52+
CompletionStage<Void> closureStage;
5053
if ( success )
5154
{
5255
// update the last-used timestamp before returning the channel back to the pool
5356
setLastUsedTimestamp( channel, clock.millis() );
57+
closureStage = completedWithNull();
5458
}
5559
else
5660
{
5761
// close the channel before returning it back to the pool if RESET failed
58-
channel.close();
62+
closureStage = asCompletionStage( channel.close() );
5963
}
60-
61-
CompletionStage<Void> released = pool.release( channel );
62-
released.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) );
64+
closureStage.exceptionally( throwable -> null )
65+
.thenCompose( ignored -> pool.release( channel ) )
66+
.whenComplete( ( ignore, error ) -> completionFuture.complete( null ) );
6367
}
6468
}

driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import java.time.Duration;
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
28+
import java.util.Collections;
2829
import java.util.HashSet;
2930
import java.util.LinkedList;
3031
import java.util.List;
32+
import java.util.Objects;
3133
import java.util.Random;
3234
import java.util.Set;
3335
import java.util.concurrent.CompletableFuture;
@@ -43,6 +45,7 @@
4345
import org.neo4j.driver.exceptions.FatalDiscoveryException;
4446
import org.neo4j.driver.exceptions.ProtocolException;
4547
import org.neo4j.driver.internal.BoltServerAddress;
48+
import org.neo4j.driver.internal.DatabaseNameUtil;
4649
import org.neo4j.driver.internal.async.connection.BootstrapFactory;
4750
import org.neo4j.driver.internal.async.pool.NettyChannelTracker;
4851
import org.neo4j.driver.internal.async.pool.PoolSettings;
@@ -84,7 +87,7 @@ class RoutingTableAndConnectionPoolTest
8487
private static final BoltServerAddress D = new BoltServerAddress( "localhost:30003" );
8588
private static final BoltServerAddress E = new BoltServerAddress( "localhost:30004" );
8689
private static final BoltServerAddress F = new BoltServerAddress( "localhost:30005" );
87-
private static final List<BoltServerAddress> SERVERS = new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) );
90+
private static final List<BoltServerAddress> SERVERS = Collections.synchronizedList( new LinkedList<>( Arrays.asList( null, A, B, C, D, E, F ) ) );
8891

8992
private static final String[] DATABASES = new String[]{"", SYSTEM_DATABASE_NAME, "my database"};
9093

@@ -93,7 +96,7 @@ class RoutingTableAndConnectionPoolTest
9396
private final Logging logging = none();
9497

9598
@Test
96-
void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable
99+
void shouldAddServerToRoutingTableAndConnectionPool()
97100
{
98101
// Given
99102
ConnectionPool connectionPool = newConnectionPool();
@@ -113,7 +116,7 @@ void shouldAddServerToRoutingTableAndConnectionPool() throws Throwable
113116
}
114117

115118
@Test
116-
void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable
119+
void shouldNotAddToRoutingTableWhenFailedWithRoutingError()
117120
{
118121
// Given
119122
ConnectionPool connectionPool = newConnectionPool();
@@ -132,7 +135,7 @@ void shouldNotAddToRoutingTableWhenFailedWithRoutingError() throws Throwable
132135
}
133136

134137
@Test
135-
void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable
138+
void shouldNotAddToRoutingTableWhenFailedWithProtocolError()
136139
{
137140
// Given
138141
ConnectionPool connectionPool = newConnectionPool();
@@ -151,7 +154,7 @@ void shouldNotAddToRoutingTableWhenFailedWithProtocolError() throws Throwable
151154
}
152155

153156
@Test
154-
void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable
157+
void shouldNotAddToRoutingTableWhenFailedWithSecurityError()
155158
{
156159
// Given
157160
ConnectionPool connectionPool = newConnectionPool();
@@ -170,7 +173,7 @@ void shouldNotAddToRoutingTableWhenFailedWithSecurityError() throws Throwable
170173
}
171174

172175
@Test
173-
void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable
176+
void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired()
174177
{
175178
// Given
176179
ConnectionPool connectionPool = newConnectionPool();
@@ -193,7 +196,7 @@ void shouldNotRemoveNewlyAddedRoutingTableEvenIfItIsExpired() throws Throwable
193196
}
194197

195198
@Test
196-
void shouldRemoveExpiredRoutingTableAndServers() throws Throwable
199+
void shouldRemoveExpiredRoutingTableAndServers()
197200
{
198201
// Given
199202
ConnectionPool connectionPool = newConnectionPool();
@@ -218,7 +221,7 @@ void shouldRemoveExpiredRoutingTableAndServers() throws Throwable
218221
}
219222

220223
@Test
221-
void shouldRemoveExpiredRoutingTableButNotServer() throws Throwable
224+
void shouldRemoveExpiredRoutingTableButNotServer()
222225
{
223226
// Given
224227
ConnectionPool connectionPool = newConnectionPool();
@@ -255,7 +258,7 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl
255258
acquireAndReleaseConnections( loadBalancer );
256259
Set<BoltServerAddress> servers = routingTables.allServers();
257260
BoltServerAddress openServer = null;
258-
for( BoltServerAddress server: servers )
261+
for ( BoltServerAddress server : servers )
259262
{
260263
if ( connectionPool.isOpen( server ) )
261264
{
@@ -267,6 +270,8 @@ void shouldHandleAddAndRemoveFromRoutingTableAndConnectionPool() throws Throwabl
267270

268271
// if we remove the open server from servers, then the connection pool should remove the server from the pool.
269272
SERVERS.remove( openServer );
273+
// ensure rediscovery is necessary on subsequent interaction
274+
Arrays.stream( DATABASES ).map( DatabaseNameUtil::database ).forEach( routingTables::remove );
270275
acquireAndReleaseConnections( loadBalancer );
271276

272277
assertFalse( connectionPool.isOpen( openServer ) );
@@ -366,7 +371,11 @@ public CompletionStage<ClusterCompositionLookupResult> lookupClusterComposition(
366371
}
367372
if ( servers.size() == 0 )
368373
{
369-
servers.add( A );
374+
BoltServerAddress address = SERVERS.stream()
375+
.filter( Objects::nonNull )
376+
.findFirst()
377+
.orElseThrow( () -> new RuntimeException( "No non null server addresses are available" ) );
378+
servers.add( address );
370379
}
371380
ClusterComposition composition = new ClusterComposition( clock.millis() + 1, servers, servers, servers );
372381
return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) );

0 commit comments

Comments
 (0)