Skip to content

Commit e5a6f48

Browse files
author
Zhen Li
committed
Exposing only the methods ConnectionPoolImpl really needs
1 parent decba71 commit e5a6f48

13 files changed

+303
-335
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NetworkConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.Channel;
22-
import io.netty.channel.pool.ChannelPool;
2322

2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.concurrent.CompletionStage;
@@ -28,6 +27,7 @@
2827
import org.neo4j.driver.internal.BoltServerAddress;
2928
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
3029
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
30+
import org.neo4j.driver.internal.async.pool.ExtendedChannelPool;
3131
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
3232
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
3333
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -57,15 +57,15 @@ public class NetworkConnection implements Connection
5757
private final BoltServerAddress serverAddress;
5858
private final ServerVersion serverVersion;
5959
private final BoltProtocol protocol;
60-
private final ChannelPool channelPool;
60+
private final ExtendedChannelPool channelPool;
6161
private final CompletableFuture<Void> releaseFuture;
6262
private final Clock clock;
6363

6464
private final AtomicReference<Status> status = new AtomicReference<>( Status.OPEN );
6565
private final MetricsListener metricsListener;
6666
private final ListenerEvent inUseEvent;
6767

68-
public NetworkConnection( Channel channel, ChannelPool channelPool, Clock clock, MetricsListener metricsListener )
68+
public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener )
6969
{
7070
this.channel = channel;
7171
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 76 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.EventLoopGroup;
24-
import io.netty.channel.pool.ChannelPool;
25-
import io.netty.util.concurrent.Future;
2624

2725
import java.util.Set;
2826
import java.util.concurrent.CompletableFuture;
@@ -49,7 +47,7 @@
4947

5048
import static java.lang.String.format;
5149
import static org.neo4j.driver.internal.util.Futures.combineErrors;
52-
import static org.neo4j.driver.internal.util.Futures.completedWithNullIfNonError;
50+
import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError;
5351

5452
public class ConnectionPoolImpl implements ConnectionPool
5553
{
@@ -98,9 +96,9 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
9896

9997
ListenerEvent acquireEvent = metricsListener.createListenerEvent();
10098
metricsListener.beforeAcquiringOrCreating( pool.id(), acquireEvent );
101-
Future<Channel> connectionFuture = pool.acquire();
99+
CompletionStage<Channel> channelFuture = pool.acquire();
102100

103-
return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
101+
return channelFuture.handle( ( channel, error ) ->
104102
{
105103
try
106104
{
@@ -135,13 +133,7 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
135133
{
136134
log.info( "Closing connection pool towards %s, it has no active connections " +
137135
"and is not in the routing table registry.", address );
138-
// Close in the background
139-
closePool( pool ).whenComplete( ( ignored, error ) -> {
140-
if ( error != null )
141-
{
142-
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
143-
}
144-
} );
136+
closePoolInBackground( address, pool );
145137
}
146138
}
147139
}
@@ -166,35 +158,19 @@ public CompletionStage<Void> close()
166158
if ( closed.compareAndSet( false, true ) )
167159
{
168160
nettyChannelTracker.prepareToCloseChannels();
169-
170-
CompletableFuture<Void> allPoolClosedFuture = CompletableFuture.allOf(
171-
pools.entrySet().stream().map( entry -> {
172-
BoltServerAddress address = entry.getKey();
173-
ExtendedChannelPool pool = entry.getValue();
174-
log.info( "Closing connection pool towards %s", address );
175-
// Wait for all pools to be closed.
176-
return closePool( pool ).toCompletableFuture();
177-
} ).toArray( CompletableFuture[]::new ) );
161+
CompletableFuture<Void> allPoolClosedFuture = closeAllPools();
178162

179163
// We can only shutdown event loop group when all netty pools are fully closed,
180164
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
181165
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
182166
pools.clear();
183167
if ( !ownsEventLoopGroup )
184168
{
185-
completedWithNullIfNonError( closeFuture, pollCloseError );
169+
completeWithNullIfNoError( closeFuture, pollCloseError );
186170
}
187171
else
188172
{
189-
// This is an attempt to speed up the shut down procedure of the driver
190-
// Feel free return this back to shutdownGracefully() method with default values
191-
// if this proves troublesome!!!
192-
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );
193-
194-
Futures.asCompletionStage( eventLoopGroup().terminationFuture() ).whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
195-
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
196-
completedWithNullIfNonError( closeFuture, combinedErrors );
197-
} );
173+
shutdownEventLoopGroup( pollCloseError );
198174
}
199175
} );
200176
}
@@ -207,31 +183,10 @@ public boolean isOpen( BoltServerAddress address )
207183
return pools.containsKey( address );
208184
}
209185

210-
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
211-
{
212-
return pools.computeIfAbsent( address, this::newPool );
213-
}
214-
215-
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
216-
{
217-
return pool.repeatableCloseAsync().whenComplete( ( ignored, error ) ->
218-
// after the connection pool is removed/close, I can remove its metrics.
219-
metricsListener.removePoolMetrics( pool.id() ) );
220-
}
221-
222-
ExtendedChannelPool newPool( BoltServerAddress address )
223-
{
224-
NettyChannelPool pool =
225-
new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(),
226-
settings.maxConnectionPoolSize() );
227-
// before the connection pool is added I can add the metrics for the pool.
228-
metricsListener.putPoolMetrics( pool.id(), address, this );
229-
return pool;
230-
}
231-
232-
private EventLoopGroup eventLoopGroup()
186+
@Override
187+
public String toString()
233188
{
234-
return bootstrap.config().group();
189+
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
235190
}
236191

237192
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
@@ -271,26 +226,84 @@ private void assertNotClosed()
271226
}
272227
}
273228

274-
private void assertNotClosed( BoltServerAddress address, Channel channel, ChannelPool pool )
229+
private void assertNotClosed( BoltServerAddress address, Channel channel, ExtendedChannelPool pool )
275230
{
276231
if ( closed.get() )
277232
{
278233
pool.release( channel );
279-
pool.close();
234+
closePoolInBackground( address, pool );
280235
pools.remove( address );
281236
assertNotClosed();
282237
}
283238
}
284239

285-
@Override
286-
public String toString()
287-
{
288-
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
289-
}
290-
291240
// for testing only
292241
ExtendedChannelPool getPool( BoltServerAddress address )
293242
{
294243
return pools.get( address );
295244
}
245+
246+
ExtendedChannelPool newPool( BoltServerAddress address )
247+
{
248+
return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(),
249+
settings.maxConnectionPoolSize() );
250+
}
251+
252+
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
253+
{
254+
return pools.computeIfAbsent( address, ignored -> {
255+
ExtendedChannelPool pool = newPool( address );
256+
// before the connection pool is added I can add the metrics for the pool.
257+
metricsListener.putPoolMetrics( pool.id(), address, this );
258+
return pool;
259+
} );
260+
}
261+
262+
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
263+
{
264+
return pool.close().whenComplete( ( ignored, error ) ->
265+
// after the connection pool is removed/close, I can remove its metrics.
266+
metricsListener.removePoolMetrics( pool.id() ) );
267+
}
268+
269+
private void closePoolInBackground( BoltServerAddress address, ExtendedChannelPool pool )
270+
{
271+
// Close in the background
272+
closePool( pool ).whenComplete( ( ignored, error ) -> {
273+
if ( error != null )
274+
{
275+
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
276+
}
277+
} );
278+
}
279+
280+
private EventLoopGroup eventLoopGroup()
281+
{
282+
return bootstrap.config().group();
283+
}
284+
285+
private void shutdownEventLoopGroup( Throwable pollCloseError )
286+
{
287+
// This is an attempt to speed up the shut down procedure of the driver
288+
// This timeout is needed for `closePoolInBackground` to finish background job, especially for races between `acquire` and `close`.
289+
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );
290+
291+
Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
292+
.whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
293+
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
294+
completeWithNullIfNoError( closeFuture, combinedErrors );
295+
} );
296+
}
297+
298+
private CompletableFuture<Void> closeAllPools()
299+
{
300+
return CompletableFuture.allOf(
301+
pools.entrySet().stream().map( entry -> {
302+
BoltServerAddress address = entry.getKey();
303+
ExtendedChannelPool pool = entry.getValue();
304+
log.info( "Closing connection pool towards %s", address );
305+
// Wait for all pools to be closed.
306+
return closePool( pool ).toCompletableFuture();
307+
} ).toArray( CompletableFuture[]::new ) );
308+
}
296309
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/ExtendedChannelPool.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
*/
1919
package org.neo4j.driver.internal.async.pool;
2020

21-
import io.netty.channel.pool.ChannelPool;
21+
import io.netty.channel.Channel;
2222

2323
import java.util.concurrent.CompletionStage;
2424

25-
public interface ExtendedChannelPool extends ChannelPool
25+
public interface ExtendedChannelPool
2626
{
27+
CompletionStage<Channel> acquire();
28+
29+
CompletionStage<Void> release( Channel channel );
30+
2731
boolean isClosed();
2832

2933
String id();
3034

31-
CompletionStage<Void> repeatableCloseAsync();
35+
CompletionStage<Void> close();
3236
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelPool.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setPoolId;
3737
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
3838

39-
public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
39+
public class NettyChannelPool implements ExtendedChannelPool
4040
{
4141
/**
4242
* Unlimited amount of parties are allowed to request channels from the pool.
@@ -47,57 +47,66 @@ public class NettyChannelPool extends FixedChannelPool implements ExtendedChanne
4747
*/
4848
private static final boolean RELEASE_HEALTH_CHECK = false;
4949

50-
private final BoltServerAddress address;
51-
private final ChannelConnector connector;
52-
private final NettyChannelTracker handler;
50+
private final FixedChannelPool delegate;
5351
private final AtomicBoolean closed = new AtomicBoolean( false );
5452
private final String id;
5553
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
5654

57-
public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
55+
NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
5856
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
5957
{
60-
super( bootstrap, handler, healthCheck, AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
61-
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK );
62-
63-
this.address = requireNonNull( address );
64-
this.connector = requireNonNull( connector );
65-
this.handler = requireNonNull( handler );
58+
requireNonNull( address );
59+
requireNonNull( connector );
60+
requireNonNull( handler );
6661
this.id = poolId( address );
67-
}
68-
69-
@Override
70-
protected ChannelFuture connectChannel( Bootstrap bootstrap )
71-
{
72-
ListenerEvent creatingEvent = handler.channelCreating( this.id );
73-
ChannelFuture channelFuture = connector.connect( address, bootstrap );
74-
channelFuture.addListener( future ->
62+
this.delegate = new FixedChannelPool( bootstrap, handler, healthCheck, FixedChannelPool.AcquireTimeoutAction.FAIL, acquireTimeoutMillis, maxConnections,
63+
MAX_PENDING_ACQUIRES, RELEASE_HEALTH_CHECK )
7564
{
76-
if ( future.isSuccess() )
77-
{
78-
// notify pool handler about a successful connection
79-
Channel channel = channelFuture.channel();
80-
setPoolId( channel, this.id );
81-
handler.channelCreated( channel, creatingEvent );
82-
}
83-
else
65+
@Override
66+
protected ChannelFuture connectChannel( Bootstrap bootstrap )
8467
{
85-
handler.channelFailedToCreate( this.id );
68+
ListenerEvent creatingEvent = handler.channelCreating( id );
69+
ChannelFuture channelFuture = connector.connect( address, bootstrap );
70+
channelFuture.addListener( future -> {
71+
if ( future.isSuccess() )
72+
{
73+
// notify pool handler about a successful connection
74+
Channel channel = channelFuture.channel();
75+
setPoolId( channel, id );
76+
handler.channelCreated( channel, creatingEvent );
77+
}
78+
else
79+
{
80+
handler.channelFailedToCreate( id );
81+
}
82+
} );
83+
return channelFuture;
8684
}
87-
} );
88-
return channelFuture;
85+
};
8986
}
9087

9188
@Override
92-
public CompletionStage<Void> repeatableCloseAsync()
89+
public CompletionStage<Void> close()
9390
{
9491
if ( closed.compareAndSet( false, true ) )
9592
{
96-
asCompletionStage( super.closeAsync(), closeFuture );
93+
asCompletionStage( delegate.closeAsync(), closeFuture );
9794
}
9895
return closeFuture;
9996
}
10097

98+
@Override
99+
public CompletionStage<Channel> acquire()
100+
{
101+
return asCompletionStage( delegate.acquire() );
102+
}
103+
104+
@Override
105+
public CompletionStage<Void> release( Channel channel )
106+
{
107+
return asCompletionStage( delegate.release( channel ) );
108+
}
109+
101110
@Override
102111
public boolean isClosed()
103112
{

0 commit comments

Comments
 (0)