From 740405c0abcef406ae2fdb8824614bc4ae6aceac Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 1 Dec 2017 17:09:21 +0100 Subject: [PATCH 1/4] Handle thread interruption by closing the channel Blocking API like `Session#run()` is now implemented on top of async API, i.e. `Session#runAsync()`. Blocking calls simply invoke async ones and uninterruptibly wait for them to complete. Previously, with blocking IO, thread interruptions resulted in `ClosedByInterruptException` which terminated query/transaction in a harsh way. Right now interrupts will do nothing. Handling them is not strictly required because `Thread#interrupt()` is not a nice thing to do. However, interrupts are still used in some environments as a last resort thing. This commit makes driver react on thread interrupts by closing the underlying Netty channel. It's only done in places where channel is actually available. Closing of channel results in exception and the waiting thread fails. It will also make database reset state for this connection, terminate transaction and query. Such aproach seems very similar to the previous one except it will result in `ServiceUnavailableException` instead of `ClosedByInterruptException`. --- .../neo4j/driver/internal/DriverFactory.java | 4 +- .../driver/internal/ExplicitTransaction.java | 14 ++- .../neo4j/driver/internal/InternalDriver.java | 4 +- .../internal/InternalStatementResult.java | 19 ++- .../neo4j/driver/internal/NetworkSession.java | 44 +++++-- .../internal/async/ChannelAttributes.java | 11 ++ .../internal/async/NettyConnection.java | 14 +++ .../internal/async/RoutingConnection.java | 6 + .../async/inbound/ChannelErrorHandler.java | 7 +- .../neo4j/driver/internal/spi/Connection.java | 2 + .../neo4j/driver/internal/util/ErrorUtil.java | 9 ++ .../neo4j/driver/internal/util/Futures.java | 32 +++++ .../internal/InternalStatementResultTest.java | 2 +- .../internal/async/ChannelAttributesTest.java | 26 +++++ .../async/ChannelErrorHandlerTest.java | 18 +++ .../internal/async/NettyConnectionTest.java | 109 +++++++++++++++++- .../driver/internal/util/ErrorUtilTest.java | 27 +++++ .../util/FailingConnectionDriverFactory.java | 6 + .../driver/v1/integration/SessionIT.java | 62 ++++++++++ 19 files changed, 394 insertions(+), 22 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 0d3a964d4e..6bb5a00838 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -82,7 +82,9 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r { InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings, eventExecutorGroup, securityPlan, retryLogic ); - Futures.blockingGet( driver.verifyConnectivity() ); + + // block to verify connectivity, close connection pool if thread gets interrupted + Futures.blockingGet( driver.verifyConnectivity(), connectionPool::close ); return driver; } catch ( Throwable driverError ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index c4454018e7..fd35de583a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -47,7 +47,6 @@ import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.neo4j.driver.internal.util.Futures.blockingGet; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; @@ -148,7 +147,8 @@ public void failure() @Override public void close() { - blockingGet( closeAsync() ); + Futures.blockingGet( closeAsync(), + () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the transaction" ) ); } CompletionStage closeAsync() @@ -272,8 +272,9 @@ public CompletionStage runAsync( String statementTemplate @Override public StatementResult run( Statement statement ) { - StatementResultCursor cursor = blockingGet( run( statement, false ) ); - return new InternalStatementResult( cursor ); + StatementResultCursor cursor = Futures.blockingGet( run( statement, false ), + () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in transaction" ) ); + return new InternalStatementResult( connection, cursor ); } @Override @@ -392,4 +393,9 @@ private BiConsumer transactionClosed( State newState ) session.setBookmark( bookmark ); }; } + + private void terminateConnectionOnThreadInterrupt( String reason ) + { + connection.terminateAndRelease( reason ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index bac3c344f2..fbb5143803 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Logger; @@ -29,7 +30,6 @@ import org.neo4j.driver.v1.Session; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.neo4j.driver.internal.util.Futures.blockingGet; public class InternalDriver implements Driver { @@ -105,7 +105,7 @@ private Session newSession( AccessMode mode, Bookmark bookmark ) @Override public void close() { - blockingGet( closeAsync() ); + Futures.blockingGet( closeAsync() ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java index 3b9b986f0a..861fe42284 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java @@ -19,7 +19,10 @@ package org.neo4j.driver.internal; import java.util.List; +import java.util.concurrent.CompletionStage; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.StatementResultCursor; @@ -28,15 +31,15 @@ import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.util.Function; -import static org.neo4j.driver.internal.util.Futures.blockingGet; - public class InternalStatementResult implements StatementResult { + private final Connection connection; private final StatementResultCursor cursor; private List keys; - public InternalStatementResult( StatementResultCursor cursor ) + public InternalStatementResult( Connection connection, StatementResultCursor cursor ) { + this.connection = connection; this.cursor = cursor; } @@ -114,4 +117,14 @@ public void remove() { throw new ClientException( "Removing records from a result is not supported." ); } + + private T blockingGet( CompletionStage stage ) + { + return Futures.blockingGet( stage, this::terminateConnectionOnThreadInterrupt ); + } + + private void terminateConnectionOnThreadInterrupt() + { + connection.terminateAndRelease( "Thread interrupted while waiting for result to arrive" ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index cb4160af6f..9afa9e9f0b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -47,7 +47,6 @@ import org.neo4j.driver.v1.types.TypeSystem; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.neo4j.driver.internal.util.Futures.blockingGet; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; @@ -132,8 +131,12 @@ public CompletionStage runAsync( String statementText, Va @Override public StatementResult run( Statement statement ) { - StatementResultCursor cursor = blockingGet( run( statement, false ) ); - return new InternalStatementResult( cursor ); + StatementResultCursor cursor = Futures.blockingGet( run( statement, false ), + () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while running query in session" ) ); + + // query executed, it is safe to obtain a connection in a blocking way + Connection connection = Futures.getNow( connectionStage ); + return new InternalStatementResult( connection, cursor ); } @Override @@ -152,7 +155,8 @@ public boolean isOpen() @Override public void close() { - blockingGet( closeAsync() ); + Futures.blockingGet( closeAsync(), + () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while closing the session" ) ); } @Override @@ -188,7 +192,7 @@ public CompletionStage closeAsync() @Override public Transaction beginTransaction() { - return blockingGet( beginTransactionAsync( mode ) ); + return beginTransaction( mode ); } @Deprecated @@ -247,7 +251,8 @@ public String lastBookmark() @Override public void reset() { - blockingGet( resetAsync() ); + Futures.blockingGet( resetAsync(), + () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while resetting the session" ) ); } private CompletionStage resetAsync() @@ -287,7 +292,7 @@ private T transaction( AccessMode mode, TransactionWork work ) // event loop thread will bock and wait for itself to read some data return retryLogic.retry( () -> { - try ( Transaction tx = blockingGet( beginTransactionAsync( mode ) ) ) + try ( Transaction tx = beginTransaction( mode ) ) { try { @@ -422,6 +427,12 @@ private CompletionStage run( Statement statement, return newResultCursorStage; } + private Transaction beginTransaction( AccessMode mode ) + { + return Futures.blockingGet( beginTransactionAsync( mode ), + () -> terminateConnectionOnThreadInterrupt( "Thread interrupted while starting a transaction" ) ); + } + private CompletionStage beginTransactionAsync( AccessMode mode ) { ensureSessionIsOpen(); @@ -515,6 +526,25 @@ private CompletionStage releaseConnection() } ); } + private void terminateConnectionOnThreadInterrupt( String reason ) + { + // try to get current connection in a blocking fashion + Connection connection = null; + try + { + connection = Futures.getNow( connectionStage ); + } + catch ( Throwable ignore ) + { + // ignore errors because handing interruptions is best effort + } + + if ( connection != null ) + { + connection.terminateAndRelease( reason ); + } + } + private CompletionStage ensureNoOpenTxBeforeRunningQuery() { return ensureNoOpenTx( "Statements cannot be run directly on a session with an open transaction; " + diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java index d9f1dd83e4..5ac3e2c257 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java @@ -34,6 +34,7 @@ public final class ChannelAttributes private static final AttributeKey CREATION_TIMESTAMP = newInstance( "creationTimestamp" ); private static final AttributeKey LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" ); private static final AttributeKey MESSAGE_DISPATCHER = newInstance( "messageDispatcher" ); + private static final AttributeKey TERMINATION_REASON = newInstance( "terminationReason" ); private ChannelAttributes() { @@ -89,6 +90,16 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher ); } + public static String terminationReason( Channel channel ) + { + return get( channel, TERMINATION_REASON ); + } + + public static void setTerminationReason( Channel channel, String reason ) + { + setOnce( channel, TERMINATION_REASON, reason ); + } + private static T get( Channel channel, AttributeKey key ) { return channel.attr( key ).get(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index dd85211cca..b9a1ee107c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -39,6 +39,8 @@ import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Value; +import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason; + public class NettyConnection implements Connection { private final Channel channel; @@ -115,6 +117,18 @@ public CompletionStage release() return releaseFuture; } + @Override + public void terminateAndRelease( String reason ) + { + if ( open.compareAndSet( true, false ) ) + { + setTerminationReason( channel, reason ); + channel.close(); + channelPool.release( channel ); + releaseFuture.complete( null ); + } + } + @Override public BoltServerAddress serverAddress() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java index 78fc44ca51..2abd7d8a5a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java @@ -83,6 +83,12 @@ public CompletionStage release() return delegate.release(); } + @Override + public void terminateAndRelease( String reason ) + { + delegate.terminateAndRelease( reason ); + } + @Override public BoltServerAddress serverAddress() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java index 7f5a7ab7c7..e156428b7c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/ChannelErrorHandler.java @@ -32,6 +32,7 @@ import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; +import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason; public class ChannelErrorHandler extends ChannelInboundHandlerAdapter { @@ -69,8 +70,10 @@ public void channelInactive( ChannelHandlerContext ctx ) if ( !failed ) { // channel became inactive not because of a fatal exception that came from exceptionCaught - // it is most likely inactive because actual network connection broke - ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError(); + // it is most likely inactive because actual network connection broke or was explicitly closed by the driver + + String terminationReason = terminationReason( ctx.channel() ); + ServiceUnavailableException error = ErrorUtil.newConnectionTerminatedError( terminationReason ); fail( ctx, error ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index a344aef336..4db6b84034 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -41,6 +41,8 @@ void runAndFlush( String statement, Map parameters, ResponseHandle CompletionStage release(); + void terminateAndRelease( String reason ); + BoltServerAddress serverAddress(); ServerVersion serverVersion(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java index e25512bc3d..a1280a5689 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java @@ -31,6 +31,15 @@ private ErrorUtil() { } + public static ServiceUnavailableException newConnectionTerminatedError( String reason ) + { + if ( reason == null ) + { + return newConnectionTerminatedError(); + } + return new ServiceUnavailableException( "Connection to the database terminated. " + reason ); + } + public static ServiceUnavailableException newConnectionTerminatedError() { return new ServiceUnavailableException( "Connection to the database terminated. " + diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java index c743f202e3..7f4c44efda 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java @@ -80,6 +80,11 @@ public static CompletableFuture failedFuture( Throwable error ) } public static V blockingGet( CompletionStage stage ) + { + return blockingGet( stage, Futures::noOpInterruptHandler ); + } + + public static V blockingGet( CompletionStage stage, Runnable interruptHandler ) { EventLoopGroupFactory.assertNotInEventLoopThread(); @@ -95,7 +100,14 @@ public static V blockingGet( CompletionStage stage ) } catch ( InterruptedException e ) { + // this thread was interrupted while waiting + // computation denoted by the future might still be running + interrupted = true; + + // run the interrupt handler and ignore if it throws + // need to wait for IO thread to actually finish, can't simply re-rethrow + safeRun( interruptHandler ); } catch ( ExecutionException e ) { @@ -112,6 +124,11 @@ public static V blockingGet( CompletionStage stage ) } } + public static T getNow( CompletionStage stage ) + { + return stage.toCompletableFuture().getNow( null ); + } + /** * Helper method to extract cause of a {@link CompletionException}. *

@@ -144,4 +161,19 @@ public static CompletionException asCompletionException( Throwable error ) } return new CompletionException( error ); } + + private static void safeRun( Runnable runnable ) + { + try + { + runnable.run(); + } + catch ( Throwable ignore ) + { + } + } + + private static void noOpInterruptHandler() + { + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java index 43e94c3fcb..84c42614db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java @@ -408,7 +408,7 @@ private StatementResult createResult( int numberOfRecords ) pullAllHandler.onSuccess( emptyMap() ); StatementResultCursor cursor = new InternalStatementResultCursor( runHandler, pullAllHandler ); - return new InternalStatementResult( cursor ); + return new InternalStatementResult( connection, cursor ); } private List values( Record record ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java index ab7a8d7038..9df27665be 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java @@ -41,6 +41,8 @@ import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.setServerAddress; import static org.neo4j.driver.internal.async.ChannelAttributes.setServerVersion; +import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason; +import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason; import static org.neo4j.driver.internal.util.ServerVersion.version; public class ChannelAttributesTest @@ -159,4 +161,28 @@ public void shouldFailToSetServerVersionTwice() assertThat( e, instanceOf( IllegalStateException.class ) ); } } + + @Test + public void shouldSetAndGetTerminationReason() + { + String reason = "This channel has been terminated"; + setTerminationReason( channel, reason ); + assertEquals( reason, terminationReason( channel ) ); + } + + @Test + public void shouldFailToSetTerminationReasonTwice() + { + setTerminationReason( channel, "Reason 1" ); + + try + { + setTerminationReason( channel, "Reason 2" ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalStateException.class ) ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelErrorHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelErrorHandlerTest.java index fd4e522c04..1495696a0d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelErrorHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelErrorHandlerTest.java @@ -30,12 +30,14 @@ import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; +import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; public class ChannelErrorHandlerTest @@ -86,6 +88,22 @@ public void shouldHandleChannelInactiveAfterExceptionCaught() assertFalse( channel.isOpen() ); } + @Test + public void shouldHandleChannelInactiveWhenTerminationReasonSet() + { + String terminationReson = "Something really bad happened"; + setTerminationReason( channel, terminationReson ); + + channel.pipeline().fireChannelInactive(); + + Throwable error = messageDispatcher.currentError(); + + assertThat( error, instanceOf( ServiceUnavailableException.class ) ); + assertThat( error.getMessage(), startsWith( "Connection to the database terminated" ) ); + assertThat( error.getMessage(), containsString( terminationReson ) ); + assertFalse( channel.isOpen() ); + } + @Test public void shouldHandleCodecException() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index 44c076a783..9cc78e33db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -48,10 +48,14 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; +import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.messaging.ResetMessage.RESET; import static org.neo4j.driver.internal.util.Iterables.single; @@ -233,6 +237,102 @@ public void shouldReturnSameCompletionStageFromRelease() assertEquals( releaseStage2, releaseStage3 ); } + @Test + public void shouldEnableAutoRead() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead( false ); + NettyConnection connection = newConnection( channel ); + + connection.enableAutoRead(); + + assertTrue( channel.config().isAutoRead() ); + } + + @Test + public void shouldDisableAutoRead() + { + EmbeddedChannel channel = new EmbeddedChannel(); + channel.config().setAutoRead( true ); + NettyConnection connection = newConnection( channel ); + + connection.disableAutoRead(); + + assertFalse( channel.config().isAutoRead() ); + } + + @Test + public void shouldSetTerminationReasonOnChannelWhenTerminated() + { + EmbeddedChannel channel = new EmbeddedChannel(); + NettyConnection connection = newConnection( channel ); + + String reason = "Something really bad has happened"; + connection.terminateAndRelease( reason ); + + assertEquals( reason, terminationReason( channel ) ); + } + + @Test + public void shouldCloseChannelWhenTerminated() + { + EmbeddedChannel channel = new EmbeddedChannel(); + NettyConnection connection = newConnection( channel ); + assertTrue( channel.isActive() ); + + connection.terminateAndRelease( "test" ); + + assertFalse( channel.isActive() ); + } + + @Test + public void shouldReleaseChannelWhenTerminated() + { + EmbeddedChannel channel = new EmbeddedChannel(); + ChannelPool pool = mock( ChannelPool.class ); + NettyConnection connection = newConnection( channel, pool ); + verify( pool, never() ).release( any() ); + + connection.terminateAndRelease( "test" ); + + verify( pool ).release( channel ); + } + + @Test + public void shouldNotReleaseChannelMultipleTimesWhenTerminatedMultipleTimes() + { + EmbeddedChannel channel = new EmbeddedChannel(); + ChannelPool pool = mock( ChannelPool.class ); + NettyConnection connection = newConnection( channel, pool ); + verify( pool, never() ).release( any() ); + + connection.terminateAndRelease( "reason 1" ); + connection.terminateAndRelease( "reason 2" ); + connection.terminateAndRelease( "reason 3" ); + + // channel is terminated with the first termination reason + assertEquals( "reason 1", terminationReason( channel ) ); + // channel is released to the pool only once + verify( pool ).release( channel ); + } + + @Test + public void shouldNotReleaseAfterTermination() + { + EmbeddedChannel channel = new EmbeddedChannel(); + ChannelPool pool = mock( ChannelPool.class ); + NettyConnection connection = newConnection( channel, pool ); + verify( pool, never() ).release( any() ); + + connection.terminateAndRelease( "test" ); + CompletionStage releaseStage = connection.release(); + + // release stage should be completed immediately + assertTrue( releaseStage.toCompletableFuture().isDone() ); + // channel is released to the pool only once + verify( pool ).release( channel ); + } + private void testWriteInEventLoop( String threadName, Consumer action ) throws Exception { EmbeddedChannel channel = spy( new EmbeddedChannel() ); @@ -267,9 +367,14 @@ private void shutdownEventLoop() throws Exception } } - private static NettyConnection newConnection( EmbeddedChannel channel ) + private static NettyConnection newConnection( Channel channel ) + { + return newConnection( channel, mock( ChannelPool.class ) ); + } + + private static NettyConnection newConnection( Channel channel, ChannelPool pool ) { - return new NettyConnection( channel, mock( ChannelPool.class ), new FakeClock() ); + return new NettyConnection( channel, pool, new FakeClock() ); } private static void assertConnectionReleasedError( IllegalStateException e ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ErrorUtilTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/ErrorUtilTest.java index 5972928e0f..6a6e619cbc 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ErrorUtilTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ErrorUtilTest.java @@ -26,14 +26,18 @@ import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.DatabaseException; import org.neo4j.driver.v1.exceptions.Neo4jException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.TransientException; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.neo4j.driver.internal.util.ErrorUtil.isFatal; +import static org.neo4j.driver.internal.util.ErrorUtil.newConnectionTerminatedError; import static org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError; public class ErrorUtilTest @@ -134,4 +138,27 @@ public void shouldTreatDatabaseExceptionAsFatal() { assertTrue( isFatal( new ClientException( "Neo.DatabaseError.Schema.ConstraintCreationFailed", "" ) ) ); } + + @Test + public void shouldCreateConnectionTerminatedError() + { + ServiceUnavailableException error = newConnectionTerminatedError(); + assertThat( error.getMessage(), startsWith( "Connection to the database terminated" ) ); + } + + @Test + public void shouldCreateConnectionTerminatedErrorWithNullReason() + { + ServiceUnavailableException error = newConnectionTerminatedError( null ); + assertThat( error.getMessage(), startsWith( "Connection to the database terminated" ) ); + } + + @Test + public void shouldCreateConnectionTerminatedErrorWithReason() + { + String reason = "Thread interrupted"; + ServiceUnavailableException error = newConnectionTerminatedError( reason ); + assertThat( error.getMessage(), startsWith( "Connection to the database terminated" ) ); + assertThat( error.getMessage(), containsString( reason ) ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java index d314fc6c46..7cf0145df2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java @@ -146,6 +146,12 @@ public CompletionStage release() return delegate.release(); } + @Override + public void terminateAndRelease( String reason ) + { + delegate.terminateAndRelease( reason ); + } + @Override public BoltServerAddress serverAddress() { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index d333f8b0ef..880825490f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -30,12 +30,14 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -46,6 +48,7 @@ import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AuthToken; @@ -68,6 +71,7 @@ import org.neo4j.driver.v1.util.TestNeo4j; import static java.lang.String.format; +import static java.util.concurrent.CompletableFuture.runAsync; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -1451,6 +1455,64 @@ public void shouldAllowToConsumeRecordsSlowlyAndRetrieveSummary() throws Interru } } + @Test + public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() + { + try ( Session session1 = neo4j.driver().session(); + Session session2 = neo4j.driver().session() ) + { + session1.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); + + Transaction tx = session1.beginTransaction(); + tx.run( "MATCH (n:Person {name: 'Beta Ray Bill'}) SET n.hammer = 'Mjolnir'" ).consume(); + + // now 'Beta Ray Bill' node is locked + + AtomicBoolean interruptedQueryFailed = new AtomicBoolean(); + Thread testThread = Thread.currentThread(); + CompletableFuture interruptFuture = runAsync( () -> + { + while ( !interruptedQueryFailed.get() ) + { + // spin until thread that executes the test goes to WAITING state + do + { + try + { + Thread.sleep( 5_00 ); + } + catch ( InterruptedException ignore ) + { + } + } + while ( testThread.getState() != Thread.State.WAITING ); + + testThread.interrupt(); + } + } ); + + try + { + session2.run( "MATCH (n:Person {name: 'Beta Ray Bill'}) SET n.hammer = 'Stormbreaker'" ).consume(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertThat( e.getMessage(), containsString( "Connection to the database terminated" ) ); + assertThat( e.getMessage(), containsString( "Thread interrupted" ) ); + } + finally + { + // stop task that perform interruptions + interruptedQueryFailed.set( true ); + Futures.blockingGet( interruptFuture ); + + // clear interrupted flag + Thread.interrupted(); + } + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); From 2bc785eb9d9bbed72d076af4590703188873023b Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 11 Dec 2017 16:04:22 +0100 Subject: [PATCH 2/4] More tests for thread interruption handling Added couple more integrations tests to verify that `Thread#interrupt()` terminates active connection and "unblocks" the blocking thread: * when driver is created towards unresponsive server * when transaction blocks to read back the result * when transaction blocks to commit Also `NettyConnection` will now fail provided handlers except throwing exception directly. Increased test coverage for future helpers. --- .../neo4j/driver/internal/DriverFactory.java | 10 +- .../driver/internal/ExplicitTransaction.java | 12 +- .../neo4j/driver/internal/NetworkSession.java | 2 +- .../internal/async/NettyConnection.java | 49 ++- .../internal/async/NettyConnectionTest.java | 68 +++- .../driver/internal/util/FuturesTest.java | 348 ++++++++++++++++++ .../neo4j/driver/v1/GraphDatabaseTest.java | 26 ++ .../driver/v1/integration/SessionIT.java | 35 +- .../driver/v1/integration/TransactionIT.java | 75 ++++ .../org/neo4j/driver/v1/util/TestUtil.java | 28 ++ .../org/neo4j/driver/v1/util/cc/Cluster.java | 14 +- 11 files changed, 588 insertions(+), 79 deletions(-) create mode 100644 driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 6bb5a00838..6eb60226e6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -84,7 +84,8 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r eventExecutorGroup, securityPlan, retryLogic ); // block to verify connectivity, close connection pool if thread gets interrupted - Futures.blockingGet( driver.verifyConnectivity(), connectionPool::close ); + Futures.blockingGet( driver.verifyConnectivity(), + () -> closeConnectionPoolOnThreadInterrupt( connectionPool, config.logging() ) ); return driver; } catch ( Throwable driverError ) @@ -315,4 +316,11 @@ private static void assertNoRoutingContext( URI uri, RoutingSettings routingSett "Routing parameters are not supported with scheme 'bolt'. Given URI: '" + uri + "'" ); } } + + private static void closeConnectionPoolOnThreadInterrupt( ConnectionPool pool, Logging logging ) + { + Logger log = logging.getLog( Driver.class.getSimpleName() ); + log.warn( "Driver creation interrupted while verifying connectivity. Connection pool will be closed" ); + pool.close(); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index fd35de583a..48dff4242f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -369,7 +369,17 @@ private BiFunction handleCommitOrRollback( Throwable cursor { return ( ignore, commitOrRollbackError ) -> { - if ( cursorFailure != null ) + if ( cursorFailure != null && commitOrRollbackError != null ) + { + Throwable cause1 = completionErrorCause( cursorFailure ); + Throwable cause2 = completionErrorCause( commitOrRollbackError ); + if ( cause1 != cause2 ) + { + cause1.addSuppressed( cause2 ); + } + throw new CompletionException( cause1 ); + } + else if ( cursorFailure != null ) { throw Futures.asCompletionException( cursorFailure ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 9afa9e9f0b..0d3cea6c77 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -528,7 +528,7 @@ private CompletionStage releaseConnection() private void terminateConnectionOnThreadInterrupt( String reason ) { - // try to get current connection in a blocking fashion + // try to get current connection if it has been acquired Connection connection = null; try { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index b9a1ee107c..44a56f020a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; @@ -51,7 +51,7 @@ public class NettyConnection implements Connection private final CompletableFuture releaseFuture; private final Clock clock; - private final AtomicBoolean open = new AtomicBoolean( true ); + private final AtomicReference status = new AtomicReference<>( Status.OPEN ); public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) { @@ -67,7 +67,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock ) @Override public boolean isOpen() { - return open.get(); + return status.get() == Status.OPEN; } @Override @@ -92,22 +92,26 @@ public void disableAutoRead() public void run( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ) { - assertOpen(); - run( statement, parameters, runHandler, pullAllHandler, false ); + if ( verifyOpen( runHandler, pullAllHandler ) ) + { + run( statement, parameters, runHandler, pullAllHandler, false ); + } } @Override public void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ) { - assertOpen(); - run( statement, parameters, runHandler, pullAllHandler, true ); + if ( verifyOpen( runHandler, pullAllHandler ) ) + { + run( statement, parameters, runHandler, pullAllHandler, true ); + } } @Override public CompletionStage release() { - if ( open.compareAndSet( true, false ) ) + if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) ) { // auto-read could've been disabled, re-enable it to automatically receive response for RESET setAutoRead( true ); @@ -120,7 +124,7 @@ public CompletionStage release() @Override public void terminateAndRelease( String reason ) { - if ( open.compareAndSet( true, false ) ) + if ( status.compareAndSet( Status.OPEN, Status.TERMINATED ) ) { setTerminationReason( channel, reason ); channel.close(); @@ -192,11 +196,32 @@ private void setAutoRead( boolean value ) channel.config().setAutoRead( value ); } - private void assertOpen() + private boolean verifyOpen( ResponseHandler runHandler, ResponseHandler pullAllHandler ) { - if ( !isOpen() ) + Status connectionStatus = this.status.get(); + switch ( connectionStatus ) { - throw new IllegalStateException( "Connection has been released to the pool and can't be reused" ); + case OPEN: + return true; + case RELEASED: + Exception error = new IllegalStateException( "Connection has been released to the pool and can't be used" ); + runHandler.onFailure( error ); + pullAllHandler.onFailure( error ); + return false; + case TERMINATED: + Exception terminatedError = new IllegalStateException( "Connection has been terminated and can't be used" ); + runHandler.onFailure( terminatedError ); + pullAllHandler.onFailure( terminatedError ); + return false; + default: + throw new IllegalStateException( "Unknown status: " + connectionStatus ); } } + + private enum Status + { + OPEN, + RELEASED, + TERMINATED + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index 9cc78e33db..0b706bc800 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -26,6 +26,7 @@ import io.netty.util.internal.ConcurrentSet; import org.junit.After; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.util.Set; import java.util.concurrent.CompletionStage; @@ -47,7 +48,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -153,38 +153,61 @@ public void shouldNotDisableAutoReadWhenReleased() @Test public void shouldNotRunWhenReleased() { + ResponseHandler runHandler = mock( ResponseHandler.class ); + ResponseHandler pullAllHandler = mock( ResponseHandler.class ); NettyConnection connection = newConnection( new EmbeddedChannel() ); connection.release(); + connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); - try - { - connection.run( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), mock( ResponseHandler.class ) ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertConnectionReleasedError( e ); - } + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class ); + verify( runHandler ).onFailure( failureCaptor.capture() ); + assertConnectionReleasedError( failureCaptor.getValue() ); } @Test public void shouldNotRunAndFlushWhenReleased() { + ResponseHandler runHandler = mock( ResponseHandler.class ); + ResponseHandler pullAllHandler = mock( ResponseHandler.class ); NettyConnection connection = newConnection( new EmbeddedChannel() ); connection.release(); + connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); - try - { - connection.runAndFlush( "RETURN 1", emptyMap(), mock( ResponseHandler.class ), - mock( ResponseHandler.class ) ); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertConnectionReleasedError( e ); - } + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class ); + verify( runHandler ).onFailure( failureCaptor.capture() ); + assertConnectionReleasedError( failureCaptor.getValue() ); + } + + @Test + public void shouldNotRunWhenTerminated() + { + ResponseHandler runHandler = mock( ResponseHandler.class ); + ResponseHandler pullAllHandler = mock( ResponseHandler.class ); + NettyConnection connection = newConnection( new EmbeddedChannel() ); + + connection.terminateAndRelease( "42" ); + connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); + + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class ); + verify( runHandler ).onFailure( failureCaptor.capture() ); + assertConnectionTerminatedError( failureCaptor.getValue() ); + } + + @Test + public void shouldNotRunAndFlushWhenTerminated() + { + ResponseHandler runHandler = mock( ResponseHandler.class ); + ResponseHandler pullAllHandler = mock( ResponseHandler.class ); + NettyConnection connection = newConnection( new EmbeddedChannel() ); + + connection.terminateAndRelease( "42" ); + connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); + + ArgumentCaptor failureCaptor = ArgumentCaptor.forClass( IllegalStateException.class ); + verify( runHandler ).onFailure( failureCaptor.capture() ); + assertConnectionTerminatedError( failureCaptor.getValue() ); } @Test @@ -382,6 +405,11 @@ private static void assertConnectionReleasedError( IllegalStateException e ) assertThat( e.getMessage(), startsWith( "Connection has been released" ) ); } + private static void assertConnectionTerminatedError( IllegalStateException e ) + { + assertThat( e.getMessage(), startsWith( "Connection has been terminated" ) ); + } + private static class ThreadTrackingInboundMessageDispatcher extends InboundMessageDispatcher { diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java new file mode 100644 index 0000000000..f8a7431e27 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java @@ -0,0 +1,348 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.FailedFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.SucceededFuture; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.neo4j.driver.internal.async.EventLoopGroupFactory; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.neo4j.driver.internal.util.Matchers.blockingOperationInEventLoopError; +import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; +import static org.neo4j.driver.v1.util.TestUtil.sleep; + +public class FuturesTest +{ + @Test + public void shouldConvertCanceledNettyFutureToCompletionStage() throws Exception + { + DefaultPromise promise = new DefaultPromise<>( ImmediateEventExecutor.INSTANCE ); + promise.cancel( true ); + + CompletableFuture future = Futures.asCompletionStage( promise ).toCompletableFuture(); + + assertTrue( future.isCancelled() ); + assertTrue( future.isCompletedExceptionally() ); + try + { + future.get(); + fail( "Exception expected" ); + } + catch ( CancellationException ignore ) + { + // expected + } + } + + @Test + public void shouldConvertSucceededNettyFutureToCompletionStage() throws Exception + { + SucceededFuture nettyFuture = new SucceededFuture<>( ImmediateEventExecutor.INSTANCE, "Hello" ); + + CompletableFuture future = Futures.asCompletionStage( nettyFuture ).toCompletableFuture(); + + assertTrue( future.isDone() ); + assertFalse( future.isCompletedExceptionally() ); + assertEquals( "Hello", future.get() ); + } + + @Test + public void shouldConvertFailedNettyFutureToCompletionStage() throws Exception + { + RuntimeException error = new RuntimeException( "Hello" ); + FailedFuture nettyFuture = new FailedFuture<>( ImmediateEventExecutor.INSTANCE, error ); + + CompletableFuture future = Futures.asCompletionStage( nettyFuture ).toCompletableFuture(); + + assertTrue( future.isCompletedExceptionally() ); + try + { + future.get(); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertEquals( error, e.getCause() ); + } + } + + @Test + public void shouldConvertRunningNettyFutureToCompletionStageWhenFutureCanceled() throws Exception + { + DefaultPromise promise = new DefaultPromise<>( ImmediateEventExecutor.INSTANCE ); + + CompletableFuture future = Futures.asCompletionStage( promise ).toCompletableFuture(); + + assertFalse( future.isDone() ); + promise.cancel( true ); + + assertTrue( future.isCancelled() ); + assertTrue( future.isCompletedExceptionally() ); + try + { + future.get(); + fail( "Exception expected" ); + } + catch ( CancellationException ignore ) + { + // expected + } + } + + @Test + public void shouldConvertRunningNettyFutureToCompletionStageWhenFutureSucceeded() throws Exception + { + DefaultPromise promise = new DefaultPromise<>( ImmediateEventExecutor.INSTANCE ); + + CompletableFuture future = Futures.asCompletionStage( promise ).toCompletableFuture(); + + assertFalse( future.isDone() ); + promise.setSuccess( "Hello" ); + + assertTrue( future.isDone() ); + assertFalse( future.isCompletedExceptionally() ); + assertEquals( "Hello", future.get() ); + } + + @Test + public void shouldConvertRunningNettyFutureToCompletionStageWhenFutureFailed() throws Exception + { + RuntimeException error = new RuntimeException( "Hello" ); + DefaultPromise promise = new DefaultPromise<>( ImmediateEventExecutor.INSTANCE ); + + CompletableFuture future = Futures.asCompletionStage( promise ).toCompletableFuture(); + + assertFalse( future.isDone() ); + promise.setFailure( error ); + + assertTrue( future.isCompletedExceptionally() ); + try + { + future.get(); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertEquals( error, e.getCause() ); + } + } + + @Test + public void shouldCreateFailedFutureWithUncheckedException() throws Exception + { + RuntimeException error = new RuntimeException( "Hello" ); + CompletableFuture future = Futures.failedFuture( error ).toCompletableFuture(); + assertTrue( future.isCompletedExceptionally() ); + try + { + future.get(); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertEquals( error, e.getCause() ); + } + } + + @Test + public void shouldCreateFailedFutureWithCheckedException() throws Exception + { + IOException error = new IOException( "Hello" ); + CompletableFuture future = Futures.failedFuture( error ).toCompletableFuture(); + assertTrue( future.isCompletedExceptionally() ); + try + { + future.get(); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertEquals( error, e.getCause() ); + } + } + + @Test + public void shouldFailBlockingGetInEventLoopThread() throws Exception + { + EventLoopGroup eventExecutor = EventLoopGroupFactory.newEventLoopGroup( 1 ); + try + { + CompletableFuture future = new CompletableFuture<>(); + Future result = eventExecutor.submit( () -> Futures.blockingGet( future ) ); + + try + { + result.get(); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), is( blockingOperationInEventLoopError() ) ); + } + } + finally + { + eventExecutor.shutdownGracefully(); + } + } + + @Test + public void shouldThrowInBlockingGetWhenFutureThrowsUncheckedException() + { + RuntimeException error = new RuntimeException( "Hello" ); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( error ); + + try + { + Futures.blockingGet( future ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldThrowInBlockingGetWhenFutureThrowsCheckedException() + { + IOException error = new IOException( "Hello" ); + + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally( error ); + + try + { + Futures.blockingGet( future ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + } + + @Test + public void shouldReturnFromBlockingGetWhenFutureCompletes() + { + CompletableFuture future = new CompletableFuture<>(); + future.complete( "Hello" ); + + assertEquals( "Hello", Futures.blockingGet( future ) ); + } + + @Test + public void shouldWaitForFutureInBlockingGetEvenWhenInterrupted() + { + ExecutorService executor = Executors.newSingleThreadExecutor( daemon( "InterruptThread" ) ); + try + { + CompletableFuture future = new CompletableFuture<>(); + + Thread.currentThread().interrupt(); + executor.submit( () -> + { + sleep( 1_000 ); + future.complete( "Hello" ); + } ); + + assertEquals( "Hello", Futures.blockingGet( future ) ); + assertTrue( Thread.currentThread().isInterrupted() ); + } + finally + { + Thread.interrupted(); // clear interruption status + executor.shutdown(); + } + } + + @Test + public void shouldHandleInterruptsInBlockingGet() + { + try + { + CompletableFuture future = new CompletableFuture<>(); + Thread.currentThread().interrupt(); + + Runnable interruptHandler = () -> future.complete( "Hello" ); + assertEquals( "Hello", Futures.blockingGet( future, interruptHandler ) ); + assertTrue( Thread.currentThread().isInterrupted() ); + } + finally + { + Thread.interrupted(); // clear interruption status + } + } + + @Test + public void shouldGetNowWhenFutureDone() + { + CompletableFuture future = new CompletableFuture<>(); + future.complete( "Hello" ); + + assertEquals( "Hello", Futures.getNow( future ) ); + } + + @Test + public void shouldGetNowWhenFutureNotDone() + { + CompletableFuture future = new CompletableFuture<>(); + + assertNull( Futures.getNow( future ) ); + } + + @Test + public void shouldGetCauseFromCompletionException() + { + RuntimeException error = new RuntimeException( "Hello" ); + CompletionException completionException = new CompletionException( error ); + + assertEquals( error, Futures.completionErrorCause( completionException ) ); + } + + @Test + public void shouldReturnSameExceptionWhenItIsNotCompletionException() + { + RuntimeException error = new RuntimeException( "Hello" ); + + assertEquals( error, Futures.completionErrorCause( error ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index 9cdd0d606c..17cc334316 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -21,11 +21,13 @@ import org.junit.Test; import java.io.File; +import java.net.ServerSocket; import java.net.URI; import java.util.List; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.StubServer; +import org.neo4j.driver.v1.util.TestUtil; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.is; @@ -156,4 +158,28 @@ public void shouldLogWhenUnableToCreateRoutingDriver() throws Exception assertEquals( 0, server1.exitStatus() ); assertEquals( 0, server2.exitStatus() ); } + + @Test + public void shouldRespondToInterruptsWhenConnectingToUnresponsiveServer() throws Exception + { + try ( ServerSocket serverSocket = new ServerSocket( 0 ) ) + { + // setup other thread to interrupt current thread when it blocks + TestUtil.interruptWhenInWaitingState( Thread.currentThread() ); + try + { + GraphDatabase.driver( "bolt://localhost:" + serverSocket.getLocalPort() ); + fail( "Exception expected" ); + } + catch ( Exception ignore ) + { + // expected + } + finally + { + // clear interrupted flag + Thread.interrupted(); + } + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 880825490f..6b9ef62f1f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -30,14 +30,12 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -48,7 +46,6 @@ import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; -import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AuthToken; @@ -69,9 +66,9 @@ import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.summary.StatementType; import org.neo4j.driver.v1.util.TestNeo4j; +import org.neo4j.driver.v1.util.TestUtil; import static java.lang.String.format; -import static java.util.concurrent.CompletableFuture.runAsync; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -1456,7 +1453,7 @@ public void shouldAllowToConsumeRecordsSlowlyAndRetrieveSummary() throws Interru } @Test - public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() + public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exception { try ( Session session1 = neo4j.driver().session(); Session session2 = neo4j.driver().session() ) @@ -1468,28 +1465,8 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() // now 'Beta Ray Bill' node is locked - AtomicBoolean interruptedQueryFailed = new AtomicBoolean(); - Thread testThread = Thread.currentThread(); - CompletableFuture interruptFuture = runAsync( () -> - { - while ( !interruptedQueryFailed.get() ) - { - // spin until thread that executes the test goes to WAITING state - do - { - try - { - Thread.sleep( 5_00 ); - } - catch ( InterruptedException ignore ) - { - } - } - while ( testThread.getState() != Thread.State.WAITING ); - - testThread.interrupt(); - } - } ); + // setup other thread to interrupt current thread when it blocks + TestUtil.interruptWhenInWaitingState( Thread.currentThread() ); try { @@ -1503,10 +1480,6 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() } finally { - // stop task that perform interruptions - interruptedQueryFailed.set( true ); - Futures.blockingGet( interruptFuture ); - // clear interrupted flag Thread.interrupted(); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index 4f60697008..af3388473c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -28,15 +28,19 @@ import java.util.concurrent.TimeUnit; import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.TestNeo4jSession; +import org.neo4j.driver.v1.util.TestUtil; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -383,4 +387,75 @@ public void shouldPropagateFailureFromSummary() assertNotNull( result.summary() ); } } + + @Test + public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exception + { + try ( Session otherSession = this.session.driver().session() ) + { + session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); + + Transaction tx1 = session.beginTransaction(); + Transaction tx2 = otherSession.beginTransaction(); + tx1.run( "MATCH (n:Person {name: 'Beta Ray Bill'}) SET n.hammer = 'Mjolnir'" ).consume(); + + // now 'Beta Ray Bill' node is locked + + // setup other thread to interrupt current thread when it blocks + TestUtil.interruptWhenInWaitingState( Thread.currentThread() ); + + try + { + tx2.run( "MATCH (n:Person {name: 'Beta Ray Bill'}) SET n.hammer = 'Stormbreaker'" ).consume(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertThat( e.getMessage(), containsString( "Connection to the database terminated" ) ); + assertThat( e.getMessage(), containsString( "Thread interrupted while waiting for result to arrive" ) ); + } + finally + { + // clear interrupted flag + Thread.interrupted(); + } + } + } + + @Test + public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exception + { + try ( Session otherSession = this.session.driver().session() ) + { + session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); + + Transaction tx1 = session.beginTransaction(); + Transaction tx2 = otherSession.beginTransaction(); + tx1.run( "MATCH (n:Person {name: 'Beta Ray Bill'}) SET n.hammer = 'Mjolnir'" ).consume(); + + // now 'Beta Ray Bill' node is locked + + tx2.run( "MATCH (n:Person {name: 'Beta Ray Bill'}) SET n.hammer = 'Stormbreaker'" ); + tx2.success(); + + // setup other thread to interrupt current thread when it blocks + TestUtil.interruptWhenInWaitingState( Thread.currentThread() ); + + try + { + tx2.close(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( e.getMessage(), + "Connection to the database terminated. Thread interrupted while closing the transaction" ); + } + finally + { + // clear interrupted flag + Thread.interrupted(); + } + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index ddbf8ad9b2..a15e084e7f 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -172,6 +172,34 @@ public static Connection connectionMock() return connection; } + public static void sleep( int millis ) + { + try + { + Thread.sleep( millis ); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new RuntimeException( e ); + } + } + + public static void interruptWhenInWaitingState( Thread thread ) + { + CompletableFuture.runAsync( () -> + { + // spin until given thread moves to WAITING state + do + { + sleep( 500 ); + } + while ( thread.getState() != Thread.State.WAITING ); + + thread.interrupt(); + } ); + } + private static void setupSuccessfulPullAll( Connection connection, String statement ) { doAnswer( invocation -> diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java index 88d13b8b79..6f9aa7e960 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java @@ -44,6 +44,7 @@ import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.v1.Config.TrustStrategy.trustAllCertificates; +import static org.neo4j.driver.v1.util.TestUtil.sleep; public class Cluster { @@ -438,17 +439,4 @@ private static ClusterMember randomOf( Set members ) } throw new AssertionError(); } - - private static void sleep( int millis ) - { - try - { - Thread.sleep( millis ); - } - catch ( InterruptedException e ) - { - Thread.currentThread().interrupt(); - throw new RuntimeException( e ); - } - } } From cc8fc7ec4eeaee5108852ba0cdd7e2e9e33a51a1 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 13 Dec 2017 14:42:10 +0100 Subject: [PATCH 3/4] Throw ServiceUnavailableException when unable to connect Driver verifies connectivity on creation. It closes the connection pool when thread is interrupted. Previously original `IllegalStateException` from the closed pool has been propagated to the user. This commit makes connectivity verification always throw `ServiceUnavailableException`. --- .../neo4j/driver/internal/DriverFactory.java | 96 ++++++++++++------- .../neo4j/driver/v1/GraphDatabaseTest.java | 2 +- 2 files changed, 65 insertions(+), 33 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 6eb60226e6..97d85bdde8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -53,6 +53,7 @@ import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.lang.String.format; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; @@ -78,29 +79,12 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config ); - try - { - InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings, - eventExecutorGroup, securityPlan, retryLogic ); + InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings, + eventExecutorGroup, securityPlan, retryLogic ); - // block to verify connectivity, close connection pool if thread gets interrupted - Futures.blockingGet( driver.verifyConnectivity(), - () -> closeConnectionPoolOnThreadInterrupt( connectionPool, config.logging() ) ); - return driver; - } - catch ( Throwable driverError ) - { - // we need to close the connection pool if driver creation threw exception - try - { - Futures.blockingGet( connectionPool.close() ); - } - catch ( Throwable closeError ) - { - driverError.addSuppressed( closeError ); - } - throw driverError; - } + verifyConnectivity( driver, connectionPool, config ); + + return driver; } protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, @@ -126,17 +110,26 @@ private InternalDriver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool, Config config, RoutingSettings routingSettings, EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic ) { - String scheme = uri.getScheme().toLowerCase(); - switch ( scheme ) + try { - case BOLT_URI_SCHEME: - assertNoRoutingContext( uri, routingSettings ); - return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool ); - case BOLT_ROUTING_URI_SCHEME: - return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic, - eventExecutorGroup ); - default: - throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); + String scheme = uri.getScheme().toLowerCase(); + switch ( scheme ) + { + case BOLT_URI_SCHEME: + assertNoRoutingContext( uri, routingSettings ); + return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool ); + case BOLT_ROUTING_URI_SCHEME: + return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic, + eventExecutorGroup ); + default: + throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); + } + } + catch ( Throwable driverError ) + { + // we need to close the connection pool if driver creation threw exception + closeConnectionPoolAndSuppressError( connectionPool, driverError ); + throw driverError; } } @@ -317,6 +310,45 @@ private static void assertNoRoutingContext( URI uri, RoutingSettings routingSett } } + private static void verifyConnectivity( InternalDriver driver, ConnectionPool connectionPool, Config config ) + { + try + { + // block to verify connectivity, close connection pool if thread gets interrupted + Futures.blockingGet( driver.verifyConnectivity(), + () -> closeConnectionPoolOnThreadInterrupt( connectionPool, config.logging() ) ); + } + catch ( Throwable connectionError ) + { + if ( Thread.currentThread().isInterrupted() ) + { + // current thread has been interrupted while verifying connectivity + // connection pool should've been closed + throw new ServiceUnavailableException( "Unable to create driver. Thread has been interrupted.", + connectionError ); + } + + // we need to close the connection pool if driver creation threw exception + closeConnectionPoolAndSuppressError( connectionPool, connectionError ); + throw connectionError; + } + } + + private static void closeConnectionPoolAndSuppressError( ConnectionPool connectionPool, Throwable mainError ) + { + try + { + Futures.blockingGet( connectionPool.close() ); + } + catch ( Throwable closeError ) + { + if ( mainError != closeError ) + { + mainError.addSuppressed( closeError ); + } + } + } + private static void closeConnectionPoolOnThreadInterrupt( ConnectionPool pool, Logging logging ) { Logger log = logging.getLog( Driver.class.getSimpleName() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index 17cc334316..b31ebd61c7 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -171,7 +171,7 @@ public void shouldRespondToInterruptsWhenConnectingToUnresponsiveServer() throws GraphDatabase.driver( "bolt://localhost:" + serverSocket.getLocalPort() ); fail( "Exception expected" ); } - catch ( Exception ignore ) + catch ( ServiceUnavailableException ignore ) { // expected } From c788f45108b8d10c69d6c105148c6b2f5750c760 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 13 Dec 2017 20:53:31 +0100 Subject: [PATCH 4/4] Fixed couple errors after rebase Also added unit tests for `Futures#asCompletionException()`. --- .../driver/internal/ExplicitTransaction.java | 6 +++--- .../driver/internal/util/FuturesTest.java | 19 +++++++++++++++++-- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 48dff4242f..a76890f580 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -371,13 +371,13 @@ private BiFunction handleCommitOrRollback( Throwable cursor { if ( cursorFailure != null && commitOrRollbackError != null ) { - Throwable cause1 = completionErrorCause( cursorFailure ); - Throwable cause2 = completionErrorCause( commitOrRollbackError ); + Throwable cause1 = Futures.completionExceptionCause( cursorFailure ); + Throwable cause2 = Futures.completionExceptionCause( commitOrRollbackError ); if ( cause1 != cause2 ) { cause1.addSuppressed( cause2 ); } - throw new CompletionException( cause1 ); + throw Futures.asCompletionException( cause1 ); } else if ( cursorFailure != null ) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java index f8a7431e27..3db116f3b6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java @@ -335,7 +335,7 @@ public void shouldGetCauseFromCompletionException() RuntimeException error = new RuntimeException( "Hello" ); CompletionException completionException = new CompletionException( error ); - assertEquals( error, Futures.completionErrorCause( completionException ) ); + assertEquals( error, Futures.completionExceptionCause( completionException ) ); } @Test @@ -343,6 +343,21 @@ public void shouldReturnSameExceptionWhenItIsNotCompletionException() { RuntimeException error = new RuntimeException( "Hello" ); - assertEquals( error, Futures.completionErrorCause( error ) ); + assertEquals( error, Futures.completionExceptionCause( error ) ); + } + + @Test + public void shouldWrapWithCompletionException() + { + RuntimeException error = new RuntimeException( "Hello" ); + CompletionException completionException = Futures.asCompletionException( error ); + assertEquals( error, completionException.getCause() ); + } + + @Test + public void shouldKeepCompletionExceptionAsIs() + { + CompletionException error = new CompletionException( new RuntimeException( "Hello" ) ); + assertEquals( error, Futures.asCompletionException( error ) ); } }