Skip to content

Commit acb1f68

Browse files
author
Zhen Li
committed
Deadlock Issues caused by Netty Blocking ChannelPool#close.
The new netty version provides both blocking pool close `FixedChannelPool#close` and async pool close `FixedChannelPool#closeAsync`. The blocking close causes deadlock issues in our code. One deadlock issue happens when `ConnectionPoolImpl#retainAll` trying to close a netty pool inside an IO thread that is used by one of the channels owned by the closing pool. The deadlock happens because 1) The channel cannot be closed until the IO thread is fully free. 2) The pool cannot be closed until the channel is closed. 3) However the IO thread cannot be free until the pool is closed. `pool --wait-on--> channel --wait-on--> IO thread --wait-on--> pool` The fix is to change 3) to let the IO thread to call async pool close to not block the close operation. Another deadlock issue happens in `Driver#close`. In `Driver#close`, we need to chain the event loop group shutdown after all pool close. As pool close tasks are executed on event loop group, if we shutdown event loop group first, then the pool close will be stuck because of no thread to pick up any shutdown tasks.
1 parent 5418cb7 commit acb1f68

File tree

6 files changed

+100
-43
lines changed

6 files changed

+100
-43
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import io.netty.channel.pool.ChannelPool;
2525
import io.netty.util.concurrent.Future;
2626

27-
import java.util.Map;
2827
import java.util.Set;
28+
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.CompletionException;
3030
import java.util.concurrent.CompletionStage;
3131
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +48,8 @@
4848
import org.neo4j.driver.internal.util.Futures;
4949

5050
import static java.lang.String.format;
51+
import static org.neo4j.driver.internal.util.Futures.combineErrors;
52+
import static org.neo4j.driver.internal.util.Futures.completedWithNullIfNonError;
5153

5254
public class ConnectionPoolImpl implements ConnectionPool
5355
{
@@ -62,6 +64,7 @@ public class ConnectionPoolImpl implements ConnectionPool
6264

6365
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
6466
private final AtomicBoolean closed = new AtomicBoolean();
67+
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
6568
private final ConnectionFactory connectionFactory;
6669

6770
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging,
@@ -131,8 +134,14 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
131134
if ( pool != null )
132135
{
133136
log.info( "Closing connection pool towards %s, it has no active connections " +
134-
"and is not in the routing table", address );
135-
closePool( pool );
137+
"and is not in the routing table registry.", address );
138+
// Close in the background
139+
closePool( pool ).whenComplete( ( ignored, error ) -> {
140+
if ( error != null )
141+
{
142+
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
143+
}
144+
} );
136145
}
137146
}
138147
}
@@ -156,37 +165,40 @@ public CompletionStage<Void> close()
156165
{
157166
if ( closed.compareAndSet( false, true ) )
158167
{
159-
try
160-
{
161-
nettyChannelTracker.prepareToCloseChannels();
162-
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
163-
{
164-
BoltServerAddress address = entry.getKey();
165-
ExtendedChannelPool pool = entry.getValue();
166-
log.info( "Closing connection pool towards %s", address );
167-
closePool( pool );
168-
}
168+
nettyChannelTracker.prepareToCloseChannels();
169169

170-
pools.clear();
171-
}
172-
finally
173-
{
170+
CompletableFuture<Void> allPoolClosedFuture = CompletableFuture.allOf(
171+
pools.entrySet().stream().map( entry -> {
172+
BoltServerAddress address = entry.getKey();
173+
ExtendedChannelPool pool = entry.getValue();
174+
log.info( "Closing connection pool towards %s", address );
175+
// Wait for all pools to be closed.
176+
return closePool( pool ).toCompletableFuture();
177+
} ).toArray( CompletableFuture[]::new ) );
174178

175-
if (ownsEventLoopGroup) {
179+
// We can only shutdown event loop group when all netty pools are fully closed,
180+
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
181+
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
182+
pools.clear();
183+
if ( !ownsEventLoopGroup )
184+
{
185+
completedWithNullIfNonError( closeFuture, pollCloseError );
186+
}
187+
else
188+
{
176189
// This is an attempt to speed up the shut down procedure of the driver
177190
// Feel free return this back to shutdownGracefully() method with default values
178191
// if this proves troublesome!!!
179-
eventLoopGroup().shutdownGracefully(200, 15_000, TimeUnit.MILLISECONDS);
192+
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );
193+
194+
Futures.asCompletionStage( eventLoopGroup().terminationFuture() ).whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
195+
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
196+
completedWithNullIfNonError( closeFuture, combinedErrors );
197+
} );
180198
}
181-
}
182-
}
183-
if (!ownsEventLoopGroup)
184-
{
185-
return Futures.completedWithNull();
199+
} );
186200
}
187-
188-
return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
189-
.thenApply( ignore -> null );
201+
return closeFuture;
190202
}
191203

192204
@Override
@@ -200,11 +212,11 @@ private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
200212
return pools.computeIfAbsent( address, this::newPool );
201213
}
202214

203-
private void closePool( ExtendedChannelPool pool )
215+
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
204216
{
205-
pool.close();
206-
// after the connection pool is removed/close, I can remove its metrics.
207-
metricsListener.removePoolMetrics( pool.id() );
217+
return pool.repeatableCloseAsync().whenComplete( ( ignored, error ) ->
218+
// after the connection pool is removed/close, I can remove its metrics.
219+
metricsListener.removePoolMetrics( pool.id() ) );
208220
}
209221

210222
ExtendedChannelPool newPool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/async/pool/ExtendedChannelPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import io.netty.channel.pool.ChannelPool;
2222

23+
import java.util.concurrent.CompletionStage;
24+
2325
public interface ExtendedChannelPool extends ChannelPool
2426
{
2527
boolean isClosed();
2628

2729
String id();
30+
31+
CompletionStage<Void> repeatableCloseAsync();
2832
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.netty.channel.pool.ChannelHealthChecker;
2525
import io.netty.channel.pool.FixedChannelPool;
2626

27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.CompletionStage;
2729
import java.util.concurrent.atomic.AtomicBoolean;
2830

2931
import org.neo4j.driver.internal.BoltServerAddress;
@@ -32,6 +34,7 @@
3234

3335
import static java.util.Objects.requireNonNull;
3436
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setPoolId;
37+
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
3538

3639
public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
3740
{
@@ -49,6 +52,7 @@ public class NettyChannelPool extends FixedChannelPool implements ExtendedChanne
4952
private final NettyChannelTracker handler;
5053
private final AtomicBoolean closed = new AtomicBoolean( false );
5154
private final String id;
55+
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
5256

5357
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
5458
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
@@ -85,19 +89,22 @@ protected ChannelFuture connectChannel( Bootstrap bootstrap )
8589
}
8690

8791
@Override
88-
public void close()
92+
public CompletionStage<Void> repeatableCloseAsync()
8993
{
9094
if ( closed.compareAndSet( false, true ) )
9195
{
92-
super.close();
96+
asCompletionStage( super.closeAsync(), closeFuture );
9397
}
98+
return closeFuture;
9499
}
95100

101+
@Override
96102
public boolean isClosed()
97103
{
98104
return closed.get();
99105
}
100106

107+
@Override
101108
public String id()
102109
{
103110
return this.id;

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,27 @@ public static <T> CompletableFuture<T> completedWithNull()
4545
return (CompletableFuture) COMPLETED_WITH_NULL;
4646
}
4747

48+
public static <T> CompletableFuture<T> completedWithNullIfNonError( CompletableFuture<T> future, Throwable error )
49+
{
50+
if ( error != null )
51+
{
52+
future.completeExceptionally( error );
53+
}
54+
else
55+
{
56+
future.complete( null );
57+
}
58+
return future;
59+
}
60+
4861
public static <T> CompletionStage<T> asCompletionStage( io.netty.util.concurrent.Future<T> future )
4962
{
5063
CompletableFuture<T> result = new CompletableFuture<>();
64+
return asCompletionStage( future, result );
65+
}
66+
67+
public static <T> CompletionStage<T> asCompletionStage( io.netty.util.concurrent.Future<T> future, CompletableFuture<T> result )
68+
{
5169
if ( future.isCancelled() )
5270
{
5371
result.cancel( true );

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.junit.jupiter.api.Test;
2525
import org.junit.jupiter.api.extension.RegisterExtension;
2626

27+
import java.util.Collections;
28+
import java.util.concurrent.CompletionStage;
29+
2730
import org.neo4j.driver.exceptions.ServiceUnavailableException;
2831
import org.neo4j.driver.internal.BoltServerAddress;
2932
import org.neo4j.driver.internal.ConnectionSettings;
@@ -86,6 +89,18 @@ void shouldAcquireIdleConnection()
8689
assertNotNull( connection2 );
8790
}
8891

92+
@Test
93+
void shouldBeAbleToClosePoolInIOWorkerThread() throws Throwable
94+
{
95+
// In the IO worker thread of a channel obtained from a pool, we shall be able to close the pool.
96+
CompletionStage<Void> future = pool.acquire( neo4j.address() ).thenCompose( Connection::release )
97+
// This shall close all pools
98+
.whenComplete( ( ignored, error ) -> pool.retainAll( Collections.emptySet() ) );
99+
100+
// We should be able to come to this line.
101+
await( future );
102+
}
103+
89104
@Test
90105
void shouldFailToAcquireConnectionToWrongAddress()
91106
{
@@ -118,7 +133,7 @@ void shouldFailToAcquireConnectionWhenPoolIsClosed()
118133
{
119134
await( pool.acquire( neo4j.address() ) );
120135
ExtendedChannelPool channelPool = this.pool.getPool( neo4j.address() );
121-
channelPool.close();
136+
await( channelPool.repeatableCloseAsync() );
122137
ServiceUnavailableException error =
123138
assertThrows( ServiceUnavailableException.class, () -> await( pool.acquire( neo4j.address() ) ) );
124139
assertThat( error.getMessage(), containsString( "closed while acquiring a connection" ) );

driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
23-
import io.netty.channel.pool.ChannelPool;
2423
import io.netty.util.concurrent.ImmediateEventExecutor;
2524
import org.junit.jupiter.api.Test;
2625

@@ -44,6 +43,7 @@
4443
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
4544
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
4645
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
46+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4747

4848
class ConnectionPoolImplTest
4949
{
@@ -73,9 +73,9 @@ void shouldRetainSpecifiedAddresses()
7373
pool.acquire( ADDRESS_3 );
7474

7575
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) );
76-
for ( ChannelPool channelPool : pool.channelPoolsByAddress.values() )
76+
for ( ExtendedChannelPool channelPool : pool.channelPoolsByAddress.values() )
7777
{
78-
verify( channelPool, never() ).close();
78+
verify( channelPool, never() ).repeatableCloseAsync();
7979
}
8080
}
8181

@@ -94,9 +94,9 @@ void shouldClosePoolsWhenRetaining()
9494
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 );
9595

9696
pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) );
97-
verify( pool.getPool( ADDRESS_1 ), never() ).close();
98-
verify( pool.getPool( ADDRESS_2 ) ).close();
99-
verify( pool.getPool( ADDRESS_3 ), never() ).close();
97+
verify( pool.getPool( ADDRESS_1 ), never() ).repeatableCloseAsync();
98+
verify( pool.getPool( ADDRESS_2 ) ).repeatableCloseAsync();
99+
verify( pool.getPool( ADDRESS_3 ), never() ).repeatableCloseAsync();
100100
}
101101

102102
@Test
@@ -114,9 +114,9 @@ void shouldNotClosePoolsWithActiveConnectionsWhenRetaining()
114114
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 );
115115

116116
pool.retainAll( singleton( ADDRESS_2 ) );
117-
verify( pool.getPool( ADDRESS_1 ), never() ).close();
118-
verify( pool.getPool( ADDRESS_2 ), never() ).close();
119-
verify( pool.getPool( ADDRESS_3 ) ).close();
117+
verify( pool.getPool( ADDRESS_1 ), never() ).repeatableCloseAsync();
118+
verify( pool.getPool( ADDRESS_2 ), never() ).repeatableCloseAsync();
119+
verify( pool.getPool( ADDRESS_3 ) ).repeatableCloseAsync();
120120
}
121121

122122
private static PoolSettings newSettings()
@@ -147,6 +147,7 @@ ExtendedChannelPool newPool( BoltServerAddress address )
147147
ExtendedChannelPool channelPool = mock( ExtendedChannelPool.class );
148148
Channel channel = mock( Channel.class );
149149
doReturn( ImmediateEventExecutor.INSTANCE.newSucceededFuture( channel ) ).when( channelPool ).acquire();
150+
doReturn( completedWithNull() ).when( channelPool ).repeatableCloseAsync();
150151
channelPoolsByAddress.put( address, channelPool );
151152
return channelPool;
152153
}

0 commit comments

Comments
 (0)