Skip to content

Commit 11edaec

Browse files
author
Zhen Li
committed
Deadlock Issues caused by Netty Blocking ChannelPool#close.
The new netty version provides both blocking pool close `FixedChannelPool#close` and async pool close `FixedChannelPool#closeAsync`. The blocking close causes deadlock issues in our code. One deadlock issue happens when `ConnectionPoolImpl#retainAll` trying to close a netty pool inside an IO thread that is used by one of the channels owned by the closing pool. The deadlock happens because 1) The channel cannot be closed until the IO thread is fully free. 2) The pool cannot be closed until the channel is closed. 3) However the IO thread cannot be free until the pool is closed. `pool --wait-on--> channel --wait-on--> IO thread --wait-on--> pool` The fix is to change 3) to let the IO thread to call async pool close to not block the close operation. Another deadlock issue happens in `Driver#close`. In `Driver#close`, we need to chain the event loop group shutdown after all pool close. As pool close tasks are executed on event loop group, if we shutdown event loop group first, then the pool close will be stuck because of no thread to pick up any shutdown tasks.
1 parent 5418cb7 commit 11edaec

File tree

6 files changed

+84
-43
lines changed

6 files changed

+84
-43
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import io.netty.channel.pool.ChannelPool;
2525
import io.netty.util.concurrent.Future;
2626

27-
import java.util.Map;
2827
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.CompletionException;
3030
import java.util.concurrent.CompletionStage;
3131
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +48,8 @@
4848
import org.neo4j.driver.internal.util.Futures;
4949

5050
import static java.lang.String.format;
51+
import static org.neo4j.driver.internal.util.Futures.combineErrors;
52+
import static org.neo4j.driver.internal.util.Futures.completedWithNullIfNonError;
5153

5254
public class ConnectionPoolImpl implements ConnectionPool
5355
{
@@ -62,6 +64,7 @@ public class ConnectionPoolImpl implements ConnectionPool
6264

6365
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
6466
private final AtomicBoolean closed = new AtomicBoolean();
67+
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
6568
private final ConnectionFactory connectionFactory;
6669

6770
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging,
@@ -131,8 +134,14 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
131134
if ( pool != null )
132135
{
133136
log.info( "Closing connection pool towards %s, it has no active connections " +
134-
"and is not in the routing table", address );
135-
closePool( pool );
137+
"and is not in the routing table registry.", address );
138+
// Close in the background
139+
closePool( pool ).whenComplete( ( ignored, error ) -> {
140+
if ( error != null )
141+
{
142+
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
143+
}
144+
} );
136145
}
137146
}
138147
}
@@ -156,37 +165,40 @@ public CompletionStage<Void> close()
156165
{
157166
if ( closed.compareAndSet( false, true ) )
158167
{
159-
try
160-
{
161-
nettyChannelTracker.prepareToCloseChannels();
162-
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
163-
{
164-
BoltServerAddress address = entry.getKey();
165-
ExtendedChannelPool pool = entry.getValue();
166-
log.info( "Closing connection pool towards %s", address );
167-
closePool( pool );
168-
}
168+
nettyChannelTracker.prepareToCloseChannels();
169169

170-
pools.clear();
171-
}
172-
finally
173-
{
170+
CompletableFuture<Void> allPoolClosedFuture = CompletableFuture.allOf(
171+
pools.entrySet().stream().map( entry -> {
172+
BoltServerAddress address = entry.getKey();
173+
ExtendedChannelPool pool = entry.getValue();
174+
log.info( "Closing connection pool towards %s", address );
175+
// Wait for all pools to be closed.
176+
return closePool( pool );
177+
} ).toArray( CompletableFuture[]::new ) );
174178

175-
if (ownsEventLoopGroup) {
179+
// We can only shutdown event loop group when all netty pools are fully closed,
180+
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
181+
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
182+
pools.clear();
183+
if ( !ownsEventLoopGroup )
184+
{
185+
completedWithNullIfNonError( closeFuture, pollCloseError );
186+
}
187+
else
188+
{
176189
// This is an attempt to speed up the shut down procedure of the driver
177190
// Feel free return this back to shutdownGracefully() method with default values
178191
// if this proves troublesome!!!
179-
eventLoopGroup().shutdownGracefully(200, 15_000, TimeUnit.MILLISECONDS);
192+
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );
193+
194+
Futures.asCompletionStage( eventLoopGroup().terminationFuture() ).whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
195+
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
196+
completedWithNullIfNonError( closeFuture, combinedErrors );
197+
} );
180198
}
181-
}
182-
}
183-
if (!ownsEventLoopGroup)
184-
{
185-
return Futures.completedWithNull();
199+
} );
186200
}
187-
188-
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
189-
.thenApply( ignore -> null );
201+
return closeFuture;
190202
}
191203

192204
@Override
@@ -200,11 +212,11 @@ private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
200212
return pools.computeIfAbsent( address, this::newPool );
201213
}
202214

203-
private void closePool( ExtendedChannelPool pool )
215+
private CompletableFuture<Void> closePool( ExtendedChannelPool pool )
204216
{
205-
pool.close();
206-
// after the connection pool is removed/close, I can remove its metrics.
207-
metricsListener.removePoolMetrics( pool.id() );
217+
return pool.repeatableCloseAsync().whenComplete( ( ignored, error ) ->
218+
// after the connection pool is removed/close, I can remove its metrics.
219+
metricsListener.removePoolMetrics( pool.id() ) );
208220
}
209221

210222
ExtendedChannelPool newPool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/async/pool/ExtendedChannelPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import io.netty.channel.pool.ChannelPool;
2222

23+
import java.util.concurrent.CompletableFuture;
24+
2325
public interface ExtendedChannelPool extends ChannelPool
2426
{
2527
boolean isClosed();
2628

2729
String id();
30+
31+
CompletableFuture<Void> repeatableCloseAsync();
2832
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.netty.channel.pool.ChannelHealthChecker;
2525
import io.netty.channel.pool.FixedChannelPool;
2626

27+
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829

2930
import org.neo4j.driver.internal.BoltServerAddress;
@@ -32,6 +33,7 @@
3233

3334
import static java.util.Objects.requireNonNull;
3435
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setPoolId;
36+
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
3537

3638
public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
3739
{
@@ -49,6 +51,7 @@ public class NettyChannelPool extends FixedChannelPool implements ExtendedChanne
4951
private final NettyChannelTracker handler;
5052
private final AtomicBoolean closed = new AtomicBoolean( false );
5153
private final String id;
54+
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
5255

5356
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
5457
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
@@ -85,19 +88,22 @@ protected ChannelFuture connectChannel( Bootstrap bootstrap )
8588
}
8689

8790
@Override
88-
public void close()
91+
public CompletableFuture<Void> repeatableCloseAsync()
8992
{
9093
if ( closed.compareAndSet( false, true ) )
9194
{
92-
super.close();
95+
asCompletionStage( super.closeAsync(), closeFuture );
9396
}
97+
return closeFuture;
9498
}
9599

100+
@Override
96101
public boolean isClosed()
97102
{
98103
return closed.get();
99104
}
100105

106+
@Override
101107
public String id()
102108
{
103109
return this.id;

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,27 @@ public static <T> CompletableFuture<T> completedWithNull()
4545
return (CompletableFuture) COMPLETED_WITH_NULL;
4646
}
4747

48+
public static <T> CompletableFuture<T> completedWithNullIfNonError( CompletableFuture<T> future, Throwable error )
49+
{
50+
if ( error != null )
51+
{
52+
future.completeExceptionally( error );
53+
}
54+
else
55+
{
56+
future.complete( null );
57+
}
58+
return future;
59+
}
60+
4861
public static <T> CompletionStage<T> asCompletionStage( io.netty.util.concurrent.Future<T> future )
4962
{
5063
CompletableFuture<T> result = new CompletableFuture<>();
64+
return asCompletionStage( future, result );
65+
}
66+
67+
public static <T> CompletionStage<T> asCompletionStage( io.netty.util.concurrent.Future<T> future, CompletableFuture<T> result )
68+
{
5169
if ( future.isCancelled() )
5270
{
5371
result.cancel( true );

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void shouldFailToAcquireConnectionWhenPoolIsClosed()
118118
{
119119
await( pool.acquire( neo4j.address() ) );
120120
ExtendedChannelPool channelPool = this.pool.getPool( neo4j.address() );
121-
channelPool.close();
121+
await( channelPool.repeatableCloseAsync() );
122122
ServiceUnavailableException error =
123123
assertThrows( ServiceUnavailableException.class, () -> await( pool.acquire( neo4j.address() ) ) );
124124
assertThat( error.getMessage(), containsString( "closed while acquiring a connection" ) );

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
23-
import io.netty.channel.pool.ChannelPool;
2423
import io.netty.util.concurrent.ImmediateEventExecutor;
2524
import org.junit.jupiter.api.Test;
2625

@@ -44,6 +43,7 @@
4443
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
4544
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
4645
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
46+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4747

4848
class ConnectionPoolImplTest
4949
{
@@ -73,9 +73,9 @@ void shouldRetainSpecifiedAddresses()
7373
pool.acquire( ADDRESS_3 );
7474

7575
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) );
76-
for ( ChannelPool channelPool : pool.channelPoolsByAddress.values() )
76+
for ( ExtendedChannelPool channelPool : pool.channelPoolsByAddress.values() )
7777
{
78-
verify( channelPool, never() ).close();
78+
verify( channelPool, never() ).repeatableCloseAsync();
7979
}
8080
}
8181

@@ -94,9 +94,9 @@ void shouldClosePoolsWhenRetaining()
9494
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 );
9595

9696
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) );
97-
verify( pool.getPool( ADDRESS_1 ), never() ).close();
98-
verify( pool.getPool( ADDRESS_2 ) ).close();
99-
verify( pool.getPool( ADDRESS_3 ), never() ).close();
97+
verify( pool.getPool( ADDRESS_1 ), never() ).repeatableCloseAsync();
98+
verify( pool.getPool( ADDRESS_2 ) ).repeatableCloseAsync();
99+
verify( pool.getPool( ADDRESS_3 ), never() ).repeatableCloseAsync();
100100
}
101101

102102
@Test
@@ -114,9 +114,9 @@ void shouldNotClosePoolsWithActiveConnectionsWhenRetaining()
114114
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 );
115115

116116
pool.retainAll( singleton( ADDRESS_2 ) );
117-
verify( pool.getPool( ADDRESS_1 ), never() ).close();
118-
verify( pool.getPool( ADDRESS_2 ), never() ).close();
119-
verify( pool.getPool( ADDRESS_3 ) ).close();
117+
verify( pool.getPool( ADDRESS_1 ), never() ).repeatableCloseAsync();
118+
verify( pool.getPool( ADDRESS_2 ), never() ).repeatableCloseAsync();
119+
verify( pool.getPool( ADDRESS_3 ) ).repeatableCloseAsync();
120120
}
121121

122122
private static PoolSettings newSettings()
@@ -147,6 +147,7 @@ ExtendedChannelPool newPool( BoltServerAddress address )
147147
ExtendedChannelPool channelPool = mock( ExtendedChannelPool.class );
148148
Channel channel = mock( Channel.class );
149149
doReturn( ImmediateEventExecutor.INSTANCE.newSucceededFuture( channel ) ).when( channelPool ).acquire();
150+
doReturn( completedWithNull() ).when( channelPool ).repeatableCloseAsync();
150151
channelPoolsByAddress.put( address, channelPool );
151152
return channelPool;
152153
}

0 commit comments

Comments
 (0)