Skip to content

Commit 0b193dc

Browse files
author
Zhen Li
committed
Exposing only the methods ConnectionPoolImpl really needs
1 parent decba71 commit 0b193dc

File tree

12 files changed

+178
-349
lines changed

12 files changed

+178
-349
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.Channel;
22-
import io.netty.channel.pool.ChannelPool;
2322

2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.concurrent.CompletionStage;
@@ -28,6 +27,7 @@
2827
import org.neo4j.driver.internal.BoltServerAddress;
2928
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
3029
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
30+
import org.neo4j.driver.internal.async.pool.ExtendedChannelPool;
3131
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
3232
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
3333
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -57,15 +57,15 @@ public class NetworkConnection implements Connection
5757
private final BoltServerAddress serverAddress;
5858
private final ServerVersion serverVersion;
5959
private final BoltProtocol protocol;
60-
private final ChannelPool channelPool;
60+
private final ExtendedChannelPool channelPool;
6161
private final CompletableFuture<Void> releaseFuture;
6262
private final Clock clock;
6363

6464
private final AtomicReference<Status> status = new AtomicReference<>( Status.OPEN );
6565
private final MetricsListener metricsListener;
6666
private final ListenerEvent inUseEvent;
6767

68-
public NetworkConnection( Channel channel, ChannelPool channelPool, Clock clock, MetricsListener metricsListener )
68+
public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener )
6969
{
7070
this.channel = channel;
7171
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 75 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.EventLoopGroup;
24-
import io.netty.channel.pool.ChannelPool;
25-
import io.netty.util.concurrent.Future;
2624

2725
import java.util.Set;
2826
import java.util.concurrent.CompletableFuture;
@@ -49,7 +47,7 @@
4947

5048
import static java.lang.String.format;
5149
import static org.neo4j.driver.internal.util.Futures.combineErrors;
52-
import static org.neo4j.driver.internal.util.Futures.completedWithNullIfNonError;
50+
import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError;
5351

5452
public class ConnectionPoolImpl implements ConnectionPool
5553
{
@@ -98,9 +96,9 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
9896

9997
ListenerEvent acquireEvent = metricsListener.createListenerEvent();
10098
metricsListener.beforeAcquiringOrCreating( pool.id(), acquireEvent );
101-
Future<Channel> connectionFuture = pool.acquire();
99+
CompletionStage<Channel> connectionFuture = pool.acquire();
102100

103-
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
101+
return connectionFuture.handle( ( channel, error ) ->
104102
{
105103
try
106104
{
@@ -135,13 +133,7 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
135133
{
136134
log.info( "Closing connection pool towards %s, it has no active connections " +
137135
"and is not in the routing table registry.", address );
138-
// Close in the background
139-
closePool( pool ).whenComplete( ( ignored, error ) -> {
140-
if ( error != null )
141-
{
142-
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
143-
}
144-
} );
136+
closePoolInBackground( address, pool );
145137
}
146138
}
147139
}
@@ -166,35 +158,19 @@ public CompletionStage<Void> close()
166158
if ( closed.compareAndSet( false, true ) )
167159
{
168160
nettyChannelTracker.prepareToCloseChannels();
169-
170-
CompletableFuture<Void> allPoolClosedFuture = CompletableFuture.allOf(
171-
pools.entrySet().stream().map( entry -> {
172-
BoltServerAddress address = entry.getKey();
173-
ExtendedChannelPool pool = entry.getValue();
174-
log.info( "Closing connection pool towards %s", address );
175-
// Wait for all pools to be closed.
176-
return closePool( pool ).toCompletableFuture();
177-
} ).toArray( CompletableFuture[]::new ) );
161+
CompletableFuture<Void> allPoolClosedFuture = closeAllPools();
178162

179163
// We can only shutdown event loop group when all netty pools are fully closed,
180164
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
181165
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
182166
pools.clear();
183167
if ( !ownsEventLoopGroup )
184168
{
185-
completedWithNullIfNonError( closeFuture, pollCloseError );
169+
completeWithNullIfNoError( closeFuture, pollCloseError );
186170
}
187171
else
188172
{
189-
// This is an attempt to speed up the shut down procedure of the driver
190-
// Feel free return this back to shutdownGracefully() method with default values
191-
// if this proves troublesome!!!
192-
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );
193-
194-
Futures.asCompletionStage( eventLoopGroup().terminationFuture() ).whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
195-
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
196-
completedWithNullIfNonError( closeFuture, combinedErrors );
197-
} );
173+
shutdownEventLoopGroup( pollCloseError );
198174
}
199175
} );
200176
}
@@ -207,31 +183,10 @@ public boolean isOpen( BoltServerAddress address )
207183
return pools.containsKey( address );
208184
}
209185

210-
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
211-
{
212-
return pools.computeIfAbsent( address, this::newPool );
213-
}
214-
215-
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
216-
{
217-
return pool.repeatableCloseAsync().whenComplete( ( ignored, error ) ->
218-
// after the connection pool is removed/close, I can remove its metrics.
219-
metricsListener.removePoolMetrics( pool.id() ) );
220-
}
221-
222-
ExtendedChannelPool newPool( BoltServerAddress address )
223-
{
224-
NettyChannelPool pool =
225-
new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(),
226-
settings.maxConnectionPoolSize() );
227-
// before the connection pool is added I can add the metrics for the pool.
228-
metricsListener.putPoolMetrics( pool.id(), address, this );
229-
return pool;
230-
}
231-
232-
private EventLoopGroup eventLoopGroup()
186+
@Override
187+
public String toString()
233188
{
234-
return bootstrap.config().group();
189+
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
235190
}
236191

237192
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
@@ -271,26 +226,83 @@ private void assertNotClosed()
271226
}
272227
}
273228

274-
private void assertNotClosed( BoltServerAddress address, Channel channel, ChannelPool pool )
229+
private void assertNotClosed( BoltServerAddress address, Channel channel, ExtendedChannelPool pool )
275230
{
276231
if ( closed.get() )
277232
{
278233
pool.release( channel );
279-
pool.close();
234+
closePoolInBackground( address, pool );
280235
pools.remove( address );
281236
assertNotClosed();
282237
}
283238
}
284239

285-
@Override
286-
public String toString()
287-
{
288-
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
289-
}
290-
291240
// for testing only
292241
ExtendedChannelPool getPool( BoltServerAddress address )
293242
{
294243
return pools.get( address );
295244
}
245+
246+
ExtendedChannelPool newPool( BoltServerAddress address )
247+
{
248+
NettyChannelPool pool =
249+
new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(),
250+
settings.maxConnectionPoolSize() );
251+
// before the connection pool is added I can add the metrics for the pool.
252+
metricsListener.putPoolMetrics( pool.id(), address, this );
253+
return pool;
254+
}
255+
256+
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
257+
{
258+
return pools.computeIfAbsent( address, this::newPool );
259+
}
260+
261+
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
262+
{
263+
return pool.close().whenComplete( ( ignored, error ) ->
264+
// after the connection pool is removed/close, I can remove its metrics.
265+
metricsListener.removePoolMetrics( pool.id() ) );
266+
}
267+
268+
private void closePoolInBackground( BoltServerAddress address, ExtendedChannelPool pool )
269+
{
270+
// Close in the background
271+
closePool( pool ).whenComplete( ( ignored, error ) -> {
272+
if ( error != null )
273+
{
274+
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
275+
}
276+
} );
277+
}
278+
279+
private EventLoopGroup eventLoopGroup()
280+
{
281+
return bootstrap.config().group();
282+
}
283+
284+
private void shutdownEventLoopGroup( Throwable pollCloseError )
285+
{
286+
// This is an attempt to speed up the shut down procedure of the driver
287+
// This timeout is needed for `closePoolInBackground` to finish background job, especially for races between `acquire` and `close`.
288+
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );
289+
290+
Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
291+
.whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
292+
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
293+
completeWithNullIfNoError( closeFuture, combinedErrors );
294+
} );
295+
}
296+
297+
private CompletableFuture<Void> closeAllPools()
298+
{
299+
return CompletableFuture.allOf(
300+
pools.entrySet().stream().map( entry -> {
301+
BoltServerAddress address = entry.getKey();
302+
ExtendedChannelPool pool = entry.getValue();
303+
log.info( "Closing connection pool towards %s", address );
304+
// Wait for all pools to be closed.
305+
return closePool( pool ).toCompletableFuture();
306+
} ).toArray( CompletableFuture[]::new ) );
307+
}
296308
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/ExtendedChannelPool.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
*/
1919
package org.neo4j.driver.internal.async.pool;
2020

21-
import io.netty.channel.pool.ChannelPool;
21+
import io.netty.channel.Channel;
2222

2323
import java.util.concurrent.CompletionStage;
2424

25-
public interface ExtendedChannelPool extends ChannelPool
25+
public interface ExtendedChannelPool
2626
{
27+
CompletionStage<Channel> acquire();
28+
29+
CompletionStage<Void> release( Channel channel );
30+
2731
boolean isClosed();
2832

2933
String id();
3034

31-
CompletionStage<Void> repeatableCloseAsync();
35+
CompletionStage<Void> close();
3236
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setPoolId;
3737
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
3838

39-
public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
39+
public class NettyChannelPool implements ExtendedChannelPool
4040
{
4141
/**
4242
* Unlimited amount of parties are allowed to request channels from the pool.
@@ -47,57 +47,66 @@ public class NettyChannelPool extends FixedChannelPool implements ExtendedChanne
4747
*/
4848
private static final boolean RELEASE_HEALTH_CHECK = false;
4949

50-
private final BoltServerAddress address;
51-
private final ChannelConnector connector;
52-
private final NettyChannelTracker handler;
50+
private final FixedChannelPool delegate;
5351
private final AtomicBoolean closed = new AtomicBoolean( false );
5452
private final String id;
5553
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
5654

57-
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
55+
NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
5856
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
5957
{
60-
super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
61-
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK );
62-
63-
this.address = requireNonNull( address );
64-
this.connector = requireNonNull( connector );
65-
this.handler = requireNonNull( handler );
58+
requireNonNull( address );
59+
requireNonNull( connector );
60+
requireNonNull( handler );
6661
this.id = poolId( address );
67-
}
68-
69-
@Override
70-
protected ChannelFuture connectChannel( Bootstrap bootstrap )
71-
{
72-
ListenerEvent creatingEvent = handler.channelCreating( this.id );
73-
ChannelFuture channelFuture = connector.connect( address, bootstrap );
74-
channelFuture.addListener( future ->
62+
this.delegate = new FixedChannelPool( bootstrap, handler, healthCheck, FixedChannelPool.AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
63+
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK )
7564
{
76-
if ( future.isSuccess() )
77-
{
78-
// notify pool handler about a successful connection
79-
Channel channel = channelFuture.channel();
80-
setPoolId( channel, this.id );
81-
handler.channelCreated( channel, creatingEvent );
82-
}
83-
else
65+
@Override
66+
protected ChannelFuture connectChannel( Bootstrap bootstrap )
8467
{
85-
handler.channelFailedToCreate( this.id );
68+
ListenerEvent creatingEvent = handler.channelCreating( id );
69+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
70+
channelFuture.addListener( future -> {
71+
if ( future.isSuccess() )
72+
{
73+
// notify pool handler about a successful connection
74+
Channel channel = channelFuture.channel();
75+
setPoolId( channel, id );
76+
handler.channelCreated( channel, creatingEvent );
77+
}
78+
else
79+
{
80+
handler.channelFailedToCreate( id );
81+
}
82+
} );
83+
return channelFuture;
8684
}
87-
} );
88-
return channelFuture;
85+
};
8986
}
9087

9188
@Override
92-
public CompletionStage<Void> repeatableCloseAsync()
89+
public CompletionStage<Void> close()
9390
{
9491
if ( closed.compareAndSet( false, true ) )
9592
{
96-
asCompletionStage( super.closeAsync(), closeFuture );
93+
asCompletionStage( delegate.closeAsync(), closeFuture );
9794
}
9895
return closeFuture;
9996
}
10097

98+
@Override
99+
public CompletionStage<Channel> acquire()
100+
{
101+
return asCompletionStage( delegate.acquire() );
102+
}
103+
104+
@Override
105+
public CompletionStage<Void> release( Channel channel )
106+
{
107+
return asCompletionStage( delegate.release( channel ) );
108+
}
109+
101110
@Override
102111
public boolean isClosed()
103112
{

0 commit comments

Comments
 (0)