Skip to content

Update netty version to the latest. #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.Future;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -48,6 +48,8 @@
import org.neo4j.driver.internal.util.Futures;

import static java.lang.String.format;
import static org.neo4j.driver.internal.util.Futures.combineErrors;
import static org.neo4j.driver.internal.util.Futures.completedWithNullIfNonError;

public class ConnectionPoolImpl implements ConnectionPool
{
Expand All @@ -62,6 +64,7 @@ public class ConnectionPoolImpl implements ConnectionPool

private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ConnectionFactory connectionFactory;

public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings, MetricsListener metricsListener, Logging logging,
Expand Down Expand Up @@ -131,8 +134,14 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
if ( pool != null )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table", address );
closePool( pool );
"and is not in the routing table registry.", address );
// Close in the background
closePool( pool ).whenComplete( ( ignored, error ) -> {
if ( error != null )
{
log.warn( format( "An error occurred while closing connection pool towards %s.", address ), error );
}
} );
}
}
}
Expand All @@ -156,37 +165,40 @@ public CompletionStage<Void> close()
{
if ( closed.compareAndSet( false, true ) )
{
try
{
nettyChannelTracker.prepareToCloseChannels();
for ( Map.Entry<BoltServerAddress,ExtendedChannelPool> entry : pools.entrySet() )
{
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
closePool( pool );
}
nettyChannelTracker.prepareToCloseChannels();

pools.clear();
}
finally
{
CompletableFuture<Void> allPoolClosedFuture = CompletableFuture.allOf(
pools.entrySet().stream().map( entry -> {
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
// Wait for all pools to be closed.
return closePool( pool ).toCompletableFuture();
} ).toArray( CompletableFuture[]::new ) );

if (ownsEventLoopGroup) {
// We can only shutdown event loop group when all netty pools are fully closed,
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
pools.clear();
if ( !ownsEventLoopGroup )
{
completedWithNullIfNonError( closeFuture, pollCloseError );
}
else
{
// This is an attempt to speed up the shut down procedure of the driver
// Feel free return this back to shutdownGracefully() method with default values
// if this proves troublesome!!!
eventLoopGroup().shutdownGracefully(200, 15_000, TimeUnit.MILLISECONDS);
eventLoopGroup().shutdownGracefully( 200, 15_000, TimeUnit.MILLISECONDS );

Futures.asCompletionStage( eventLoopGroup().terminationFuture() ).whenComplete( ( ignore, eventLoopGroupTerminationError ) -> {
CompletionException combinedErrors = combineErrors( pollCloseError, eventLoopGroupTerminationError );
completedWithNullIfNonError( closeFuture, combinedErrors );
} );
}
}
}
if (!ownsEventLoopGroup)
{
return Futures.completedWithNull();
} );
}

return Futures.asCompletionStage( eventLoopGroup().terminationFuture() )
.thenApply( ignore -> null );
return closeFuture;
}

@Override
Expand All @@ -200,11 +212,11 @@ private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
return pools.computeIfAbsent( address, this::newPool );
}

private void closePool( ExtendedChannelPool pool )
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
{
pool.close();
// after the connection pool is removed/close, I can remove its metrics.
metricsListener.removePoolMetrics( pool.id() );
return pool.repeatableCloseAsync().whenComplete( ( ignored, error ) ->
// after the connection pool is removed/close, I can remove its metrics.
metricsListener.removePoolMetrics( pool.id() ) );
}

ExtendedChannelPool newPool( BoltServerAddress address )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import io.netty.channel.pool.ChannelPool;

import java.util.concurrent.CompletionStage;

public interface ExtendedChannelPool extends ChannelPool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about removing extends ChannelPool and exposing only the methods ConnectionPoolImpl really needs?
For example, this would hide close() and closeAsync() so ConnectionPoolImpl would only have a possibility to call repeatableCloseAsync()

{
boolean isClosed();

String id();

CompletionStage<Void> repeatableCloseAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.FixedChannelPool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.BoltServerAddress;
Expand All @@ -32,6 +34,7 @@

import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setPoolId;
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;

public class NettyChannelPool extends FixedChannelPool implements ExtendedChannelPool
{
Expand All @@ -49,6 +52,7 @@ public class NettyChannelPool extends FixedChannelPool implements ExtendedChanne
private final NettyChannelTracker handler;
private final AtomicBoolean closed = new AtomicBoolean( false );
private final String id;
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

public NettyChannelPool( BoltServerAddress address, ChannelConnector connector, Bootstrap bootstrap, NettyChannelTracker handler,
ChannelHealthChecker healthCheck, long acquireTimeoutMillis, int maxConnections )
Expand Down Expand Up @@ -85,19 +89,22 @@ protected ChannelFuture connectChannel( Bootstrap bootstrap )
}

@Override
public void close()
public CompletionStage<Void> repeatableCloseAsync()
{
if ( closed.compareAndSet( false, true ) )
{
super.close();
asCompletionStage( super.closeAsync(), closeFuture );
}
return closeFuture;
}

@Override
public boolean isClosed()
{
return closed.get();
}

@Override
public String id()
{
return this.id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ public void channelCreated( Channel channel, ListenerEvent creatingEvent )
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s",
channel.id(), channel.localAddress(), channel.remoteAddress() );

incrementInUse( channel );
incrementIdle( channel ); // when it is created, we count it as idle as it has not been acquired out of the pool
metricsListener.afterCreated( poolId( channel ), creatingEvent );

allChannels.add( channel );
}

Expand Down
18 changes: 18 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/util/Futures.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,27 @@ public static <T> CompletableFuture<T> completedWithNull()
return (CompletableFuture) COMPLETED_WITH_NULL;
}

public static <T> CompletableFuture<T> completedWithNullIfNonError( CompletableFuture<T> future, Throwable error )
{
if ( error != null )
{
future.completeExceptionally( error );
}
else
{
future.complete( null );
}
return future;
}

public static <T> CompletionStage<T> asCompletionStage( io.netty.util.concurrent.Future<T> future )
{
CompletableFuture<T> result = new CompletableFuture<>();
return asCompletionStage( future, result );
}

public static <T> CompletionStage<T> asCompletionStage( io.netty.util.concurrent.Future<T> future, CompletableFuture<T> result )
{
if ( future.isCancelled() )
{
result.cancel( true );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Collections;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ConnectionSettings;
Expand Down Expand Up @@ -86,6 +89,18 @@ void shouldAcquireIdleConnection()
assertNotNull( connection2 );
}

@Test
void shouldBeAbleToClosePoolInIOWorkerThread() throws Throwable
{
// In the IO worker thread of a channel obtained from a pool, we shall be able to close the pool.
CompletionStage<Void> future = pool.acquire( neo4j.address() ).thenCompose( Connection::release )
// This shall close all pools
.whenComplete( ( ignored, error ) -> pool.retainAll( Collections.emptySet() ) );

// We should be able to come to this line.
await( future );
}

@Test
void shouldFailToAcquireConnectionToWrongAddress()
{
Expand Down Expand Up @@ -118,12 +133,12 @@ void shouldFailToAcquireConnectionWhenPoolIsClosed()
{
await( pool.acquire( neo4j.address() ) );
ExtendedChannelPool channelPool = this.pool.getPool( neo4j.address() );
channelPool.close();
await( channelPool.repeatableCloseAsync() );
ServiceUnavailableException error =
assertThrows( ServiceUnavailableException.class, () -> await( pool.acquire( neo4j.address() ) ) );
assertThat( error.getMessage(), containsString( "closed while acquiring a connection" ) );
assertThat( error.getCause(), instanceOf( IllegalStateException.class ) );
assertThat( error.getCause().getMessage(), containsString( "FixedChannelPooled was closed" ) );
assertThat( error.getCause().getMessage(), containsString( "FixedChannelPool was closed" ) );
}

private ConnectionPoolImpl newPool() throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.jupiter.api.Test;

Expand All @@ -44,6 +43,7 @@
import static org.neo4j.driver.internal.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.metrics.InternalAbstractMetrics.DEV_NULL_METRICS;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;

class ConnectionPoolImplTest
{
Expand Down Expand Up @@ -73,9 +73,9 @@ void shouldRetainSpecifiedAddresses()
pool.acquire( ADDRESS_3 );

pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_2, ADDRESS_3 ) ) );
for ( ChannelPool channelPool : pool.channelPoolsByAddress.values() )
for ( ExtendedChannelPool channelPool : pool.channelPoolsByAddress.values() )
{
verify( channelPool, never() ).close();
verify( channelPool, never() ).repeatableCloseAsync();
}
}

Expand All @@ -94,9 +94,9 @@ void shouldClosePoolsWhenRetaining()
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 3 );

pool.retainAll( new HashSet<>( asList( ADDRESS_1, ADDRESS_3 ) ) );
verify( pool.getPool( ADDRESS_1 ), never() ).close();
verify( pool.getPool( ADDRESS_2 ) ).close();
verify( pool.getPool( ADDRESS_3 ), never() ).close();
verify( pool.getPool( ADDRESS_1 ), never() ).repeatableCloseAsync();
verify( pool.getPool( ADDRESS_2 ) ).repeatableCloseAsync();
verify( pool.getPool( ADDRESS_3 ), never() ).repeatableCloseAsync();
}

@Test
Expand All @@ -114,9 +114,9 @@ void shouldNotClosePoolsWithActiveConnectionsWhenRetaining()
when( nettyChannelTracker.inUseChannelCount( ADDRESS_3 ) ).thenReturn( 0 );

pool.retainAll( singleton( ADDRESS_2 ) );
verify( pool.getPool( ADDRESS_1 ), never() ).close();
verify( pool.getPool( ADDRESS_2 ), never() ).close();
verify( pool.getPool( ADDRESS_3 ) ).close();
verify( pool.getPool( ADDRESS_1 ), never() ).repeatableCloseAsync();
verify( pool.getPool( ADDRESS_2 ), never() ).repeatableCloseAsync();
verify( pool.getPool( ADDRESS_3 ) ).repeatableCloseAsync();
}

private static PoolSettings newSettings()
Expand Down Expand Up @@ -147,6 +147,7 @@ ExtendedChannelPool newPool( BoltServerAddress address )
ExtendedChannelPool channelPool = mock( ExtendedChannelPool.class );
Channel channel = mock( Channel.class );
doReturn( ImmediateEventExecutor.INSTANCE.newSucceededFuture( channel ) ).when( channelPool ).acquire();
doReturn( completedWithNull() ).when( channelPool ).repeatableCloseAsync();
channelPoolsByAddress.put( address, channelPool );
return channelPool;
}
Expand Down
Loading