Skip to content

Update netty version to the latest. #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.neo4j.driver.internal.async;

import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -28,6 +27,7 @@
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.async.connection.ChannelAttributes;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.async.pool.ExtendedChannelPool;
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
import org.neo4j.driver.internal.messaging.BoltProtocol;
Expand Down Expand Up @@ -57,15 +57,15 @@ public class NetworkConnection implements Connection
private final BoltServerAddress serverAddress;
private final ServerVersion serverVersion;
private final BoltProtocol protocol;
private final ChannelPool channelPool;
private final ExtendedChannelPool channelPool;
private final CompletableFuture<Void> releaseFuture;
private final Clock clock;

private final AtomicReference<Status> status = new AtomicReference<>( Status.OPEN );
private final MetricsListener metricsListener;
private final ListenerEvent inUseEvent;

public NetworkConnection( Channel channel, ChannelPool channelPool, Clock clock, MetricsListener metricsListener )
public NetworkConnection( Channel channel, ExtendedChannelPool channelPool, Clock clock, MetricsListener metricsListener )
{
this.channel = channel;
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,6 +46,8 @@
import org.neo4j.driver.internal.util.Futures;

import static java.lang.String.format;
import static org.neo4j.driver.internal.util.Futures.combineErrors;
import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError;

public class ConnectionPoolImpl implements ConnectionPool
{
Expand All @@ -62,6 +62,7 @@ public class ConnectionPoolImpl implements ConnectionPool

private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ConnectionFactory connectionFactory;

public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging,
Expand Down Expand Up @@ -95,9 +96,9 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )

ListenerEvent acquireEvent = metricsListener.createListenerEvent();
metricsListener.beforeAcquiringOrCreating( pool.id(), acquireEvent );
Future<Channel> connectionFuture = pool.acquire();
CompletionStage<Channel> channelFuture = pool.acquire();

return Futures.asCompletionStage( connectionFuture ).handle( ( channel, error ) ->
return channelFuture.handle( ( channel, error ) ->
{
try
{
Expand Down Expand Up @@ -131,8 +132,8 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
if ( pool != null )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table", address );
closePool( pool );
"and is not in the routing table registry.", address );
closePoolInBackground( address, pool );
}
}
}
Expand All @@ -156,37 +157,24 @@ public CompletionStage<Void> close()
{
if ( closed.compareAndSet( false, true ) )
{
try
{
nettyChannelTracker.prepareToCloseChannels();
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
{
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
closePool( pool );
}
nettyChannelTracker.prepareToCloseChannels();
CompletableFuture<Void> allPoolClosedFuture = closeAllPools();

// We can only shutdown event loop group when all netty pools are fully closed,
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
pools.clear();
}
finally
{

if (ownsEventLoopGroup) {
// This is an attempt to speed up the shut down procedure of the driver
// Feel free return this back to shutdownGracefully() method with default values
// if this proves troublesome!!!
eventLoopGroup().shutdownGracefully(200, 15_000, TimeUnit.MILLISECONDS);
if ( !ownsEventLoopGroup )
{
completeWithNullIfNoError( closeFuture, pollCloseError );
}
}
}
if (!ownsEventLoopGroup)
{
return Futures.completedWithNull();
else
{
shutdownEventLoopGroup( pollCloseError );
}
} );
}

return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
.thenApply( ignore -> null );
return closeFuture;
}

@Override
Expand All @@ -195,31 +183,10 @@ public boolean isOpen( BoltServerAddress address )
return pools.containsKey( address );
}

private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
{
return pools.computeIfAbsent( address, this::newPool );
}

private void closePool( ExtendedChannelPool pool )
{
pool.close();
// after the connection pool is removed/close, I can remove its metrics.
metricsListener.removePoolMetrics( pool.id() );
}

ExtendedChannelPool newPool( BoltServerAddress address )
{
NettyChannelPool pool =
new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(),
settings.maxConnectionPoolSize() );
// before the connection pool is added I can add the metrics for the pool.
metricsListener.putPoolMetrics( pool.id(), address, this );
return pool;
}

private EventLoopGroup eventLoopGroup()
@Override
public String toString()
{
return bootstrap.config().group();
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
}

private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
Expand Down Expand Up @@ -259,26 +226,84 @@ private void assertNotClosed()
}
}

private void assertNotClosed( BoltServerAddress address, Channel channel, ChannelPool pool )
private void assertNotClosed( BoltServerAddress address, Channel channel, ExtendedChannelPool pool )
{
if ( closed.get() )
{
pool.release( channel );
pool.close();
closePoolInBackground( address, pool );
pools.remove( address );
assertNotClosed();
}
}

@Override
public String toString()
{
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
}

// for testing only
ExtendedChannelPool getPool( BoltServerAddress address )
{
return pools.get( address );
}

ExtendedChannelPool newPool( BoltServerAddress address )
{
return new NettyChannelPool( address, connector, bootstrap, nettyChannelTracker, channelHealthChecker, settings.connectionAcquisitionTimeout(),
settings.maxConnectionPoolSize() );
}

private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
{
return pools.computeIfAbsent( address, ignored -> {
ExtendedChannelPool pool = newPool( address );
// before the connection pool is added I can add the metrics for the pool.
metricsListener.putPoolMetrics( pool.id(), address, this );
return pool;
} );
}

private CompletionStage<Void> closePool( ExtendedChannelPool pool )
{
return pool.close().whenComplete( ( ignored, error ) ->
// after the connection pool is removed/close, I can remove its metrics.
metricsListener.removePoolMetrics( pool.id() ) );
}

private void closePoolInBackground( BoltServerAddress address, ExtendedChannelPool pool )
{
// Close in the background
closePool( pool ).whenComplete( ( ignored, error ) -> {
if ( error != null )
{
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
}
} );
}

private EventLoopGroup eventLoopGroup()
{
return bootstrap.config().group();
}

private void shutdownEventLoopGroup( Throwable pollCloseError )
{
// This is an attempt to speed up the shut down procedure of the driver
// This timeout is needed for `closePoolInBackground` to finish background job, especially for races between `acquire` and `close`.
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );

Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
.whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
completeWithNullIfNoError( closeFuture, combinedErrors );
} );
}

private CompletableFuture<Void> closeAllPools()
{
return CompletableFuture.allOf(
pools.entrySet().stream().map( entry -> {
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
// Wait for all pools to be closed.
return closePool( pool ).toCompletableFuture();
} ).toArray( CompletableFuture[]::new ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
*/
package org.neo4j.driver.internal.async.pool;

import io.netty.channel.pool.ChannelPool;
import io.netty.channel.Channel;

public interface ExtendedChannelPool extends ChannelPool
import java.util.concurrent.CompletionStage;

public interface ExtendedChannelPool
{
CompletionStage<Channel> acquire();

CompletionStage<Void> release( Channel channel );

boolean isClosed();

String id();

CompletionStage<Void> close();
}
Loading