diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index d74fcc6b8e..0532c7c968 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -67,14 +67,20 @@ public void channelInactive( ChannelHandlerContext ctx ) { log.debug( "Channel is inactive" ); + String terminationReason = terminationReason( ctx.channel() ); + Throwable error = ErrorUtil.newConnectionTerminatedError( terminationReason ); + if ( !failed ) { // channel became inactive not because of a fatal exception that came from exceptionCaught // it is most likely inactive because actual network connection broke or was explicitly closed by the driver - String terminationReason = terminationReason( ctx.channel() ); - ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError( terminationReason ); - fail( ctx, error ); + messageDispatcher.handleChannelInactive( error ); + ctx.channel().close(); + } + else + { + fail( error ); } } @@ -89,16 +95,14 @@ public void exceptionCaught( ChannelHandlerContext ctx, Throwable error ) { failed = true; log.warn( "Fatal error occurred in the pipeline", error ); - fail( ctx, error ); + fail( error ); } } - private void fail( ChannelHandlerContext ctx, Throwable error ) + private void fail( Throwable error ) { Throwable cause = transformError( error ); messageDispatcher.handleChannelError( cause ); - log.debug( "Closing channel because of a failure '%s'", error ); - ctx.close(); } private static Throwable transformError( Throwable error ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 4d77a80a3a..fce30f8561 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Queue; +import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.handlers.ResetResponseHandler; import org.neo4j.driver.internal.logging.ChannelActivityLogger; import org.neo4j.driver.internal.messaging.ResponseMessageHandler; @@ -45,6 +46,7 @@ public class InboundMessageDispatcher implements ResponseMessageHandler private final Queue handlers = new LinkedList<>(); private final Logger log; + private volatile boolean gracefullyClosed; private Throwable currentError; private boolean fatalErrorOccurred; @@ -142,6 +144,20 @@ public void handleIgnoredMessage() handler.onFailure( error ); } + public void handleChannelInactive( Throwable cause ) + { + // report issue if the connection has not been terminated as a result of a graceful shutdown request from its + // parent pool + if ( !gracefullyClosed ) + { + handleChannelError( cause ); + } + else + { + channel.close(); + } + } + public void handleChannelError( Throwable error ) { if ( currentError != null ) @@ -160,6 +176,9 @@ public void handleChannelError( Throwable error ) ResponseHandler handler = removeHandler(); handler.onFailure( currentError ); } + + log.debug( "Closing channel because of a failure '%s'", error ); + channel.close(); } public void clearCurrentError() @@ -177,6 +196,11 @@ public boolean fatalErrorOccurred() return fatalErrorOccurred; } + public void prepareToCloseChannel( ) + { + this.gracefullyClosed = true; + } + /** * Visible for testing */ diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java index dbcbc75a02..eb4e0101ff 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java @@ -33,6 +33,7 @@ import org.neo4j.driver.internal.BookmarkHolder; import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.async.UnmanagedTransaction; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.cursor.AsyncResultCursorOnlyFactory; import org.neo4j.driver.internal.cursor.ResultCursorFactory; @@ -101,9 +102,13 @@ public void initializeChannel( String userAgent, AuthToken authToken, RoutingCon @Override public void prepareToCloseChannel( Channel channel ) { + InboundMessageDispatcher messageDispatcher = messageDispatcher( channel ); + GoodbyeMessage message = GoodbyeMessage.GOODBYE; - messageDispatcher( channel ).enqueue( NoOpResponseHandler.INSTANCE ); + messageDispatcher.enqueue( NoOpResponseHandler.INSTANCE ); channel.writeAndFlush( message, channel.voidPromise() ); + + messageDispatcher.prepareToCloseChannel( ); } @Override