From c78518fa6a76ef5c2e794cca6347a98b6ab506e6 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 9 Jan 2018 14:21:33 +0100 Subject: [PATCH 1/5] Fixed error when connection killed during tx Connection error can happen while transaction is executing. Closing transaction after such error should not perform rollback because connection is dead and database has cleaned up all resources. Previously close operation tried to perform rollback and failed with same `ServiceUnavailable` exception as the unsuccessful `#run()`. Error thrown from `#close()` has to be added as a suppressed error to the one thrown from `#run()`, when used in try-with-resources block. So code attempted to add error to itself as suppressed. This resulted in an `IllegalArgumentException`. This commit fixes the problem by making `#close()` not perform a rollback after a fatal connection error. --- .../driver/internal/ExplicitTransaction.java | 29 ++++----- .../TransactionPullAllResponseHandler.java | 10 ++- .../internal/ExplicitTransactionTest.java | 12 ++++ ...TransactionPullAllResponseHandlerTest.java | 37 ++++++++++- .../util/ChannelTrackingDriverFactory.java | 14 ++++ .../driver/v1/integration/SessionIT.java | 3 +- .../v1/integration/TransactionAsyncIT.java | 10 +-- .../driver/v1/integration/TransactionIT.java | 64 +++++++++++++++++-- 8 files changed, 147 insertions(+), 32 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index d851cf226b..71c66d2d98 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.function.BiConsumer; import java.util.function.BiFunction; import org.neo4j.driver.internal.async.QueryRunner; @@ -69,7 +68,8 @@ private enum State MARKED_FAILED( true ), /** - * This transaction has been explicitly terminated by calling {@link Session#reset()}. + * This transaction has been terminated either because of explicit {@link Session#reset()} or because of a + * fatal connection error. */ TERMINATED( false ), @@ -181,14 +181,13 @@ else if ( state == State.ROLLED_BACK ) } else if ( state == State.TERMINATED ) { - return failedFuture( - new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) ); + return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); } else { return resultCursors.retrieveNotConsumedError() .thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) ) - .whenComplete( transactionClosed( State.COMMITTED ) ); + .whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) ); } } @@ -205,15 +204,15 @@ else if ( state == State.ROLLED_BACK ) } else if ( state == State.TERMINATED ) { - // transaction has been terminated by RESET and should be rolled back by the database - state = State.ROLLED_BACK; + // no need for explicit rollback, transaction should've been rolled back by the database + transactionClosed( State.ROLLED_BACK ); return completedWithNull(); } else { return resultCursors.retrieveNotConsumedError() .thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) ) - .whenComplete( transactionClosed( State.ROLLED_BACK ) ); + .whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) ); } } @@ -314,8 +313,7 @@ else if ( state == State.MARKED_FAILED ) } else if ( state == State.TERMINATED ) { - throw new ClientException( - "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" ); + throw new ClientException( "Cannot run more statements in this transaction, it has been terminated" ); } } @@ -394,14 +392,11 @@ else if ( commitOrRollbackError != null ) }; } - private BiConsumer transactionClosed( State newState ) + private void transactionClosed( State newState ) { - return ( ignore, error ) -> - { - state = newState; - connection.release(); // release in background - session.setBookmark( bookmark ); - }; + state = newState; + connection.release(); // release in background + session.setBookmark( bookmark ); } private void terminateConnectionOnThreadInterrupt( String reason ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java index d0fc63f71c..d153601005 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandler.java @@ -20,6 +20,7 @@ import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.ErrorUtil; import org.neo4j.driver.v1.Statement; import static java.util.Objects.requireNonNull; @@ -43,6 +44,13 @@ protected void afterSuccess() @Override protected void afterFailure( Throwable error ) { - tx.failure(); + if ( ErrorUtil.isFatal( error ) ) + { + tx.markTerminated(); + } + else + { + tx.failure(); + } } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 56f86753e5..c1366eac51 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -256,6 +256,18 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds() verify( connection, never() ).release(); } + @Test + public void shouldReleaseConnectionWhenTerminatedAndRolledBack() + { + Connection connection = connectionMock(); + ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) ); + + tx.markTerminated(); + await( tx.rollbackAsync() ); + + verify( connection ).release(); + } + private static ExplicitTransaction beginTx( Connection connection ) { return beginTx( connection, Bookmark.empty() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java index 59b635357a..073ad596b9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullAllResponseHandlerTest.java @@ -20,6 +20,7 @@ import org.junit.Test; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.internal.BoltServerAddress; @@ -27,6 +28,10 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Statement; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.exceptions.TransientException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -35,7 +40,26 @@ public class TransactionPullAllResponseHandlerTest { @Test - public void shouldMarkTransactionAsFailedOnFailure() + public void shouldMarkTransactionAsFailedOnNonFatalFailures() + { + testErrorHandling( new ClientException( "Neo.ClientError.Cluster.NotALeader", "" ), false ); + testErrorHandling( new ClientException( "Neo.ClientError.Procedure.ProcedureCallFailed", "" ), false ); + testErrorHandling( new TransientException( "Neo.TransientError.Transaction.Terminated", "" ), false ); + testErrorHandling( new TransientException( "Neo.TransientError.General.DatabaseUnavailable", "" ), false ); + } + + @Test + public void shouldMarkTransactionAsTerminatedOnFatalFailures() + { + testErrorHandling( new RuntimeException(), true ); + testErrorHandling( new IOException(), true ); + testErrorHandling( new ServiceUnavailableException( "" ), true ); + testErrorHandling( new SessionExpiredException( "" ), true ); + testErrorHandling( new SessionExpiredException( "" ), true ); + testErrorHandling( new ClientException( "Neo.ClientError.Request.Invalid" ), true ); + } + + private static void testErrorHandling( Throwable error, boolean fatal ) { Connection connection = mock( Connection.class ); when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); @@ -44,8 +68,15 @@ public void shouldMarkTransactionAsFailedOnFailure() TransactionPullAllResponseHandler handler = new TransactionPullAllResponseHandler( new Statement( "RETURN 1" ), new RunResponseHandler( new CompletableFuture<>() ), connection, tx ); - handler.onFailure( new RuntimeException() ); + handler.onFailure( error ); - verify( tx ).failure(); + if ( fatal ) + { + verify( tx ).markTerminated(); + } + else + { + verify( tx ).failure(); + } } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java index 7c8cac4ddb..59b047d28b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ChannelTrackingDriverFactory.java @@ -27,6 +27,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.async.BootstrapFactory; import org.neo4j.driver.internal.async.ChannelConnector; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.ConnectionPool; @@ -36,11 +37,24 @@ public class ChannelTrackingDriverFactory extends DriverFactoryWithClock { private final List channels = new CopyOnWriteArrayList<>(); + private final int eventLoopThreads; private ConnectionPool pool; public ChannelTrackingDriverFactory( Clock clock ) + { + this( 0, clock ); + } + + public ChannelTrackingDriverFactory( int eventLoopThreads, Clock clock ) { super( clock ); + this.eventLoopThreads = eventLoopThreads; + } + + @Override + protected Bootstrap createBootstrap() + { + return eventLoopThreads == 0 ? super.createBootstrap() : BootstrapFactory.newBootstrap( eventLoopThreads ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 318c7f0d55..76c6b650f3 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -263,8 +263,7 @@ public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable assertThat( tx2, notNullValue() ); exception.expect( ClientException.class ); // errors differ depending of neo4j version - exception.expectMessage( - "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" ); + exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" ); tx1.run( "RETURN 1" ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index 18ea98791e..617dc5cb24 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -653,15 +653,16 @@ public void shouldFailWhenListTransformationFunctionFails() } @Test - public void shouldFailWhenServerIsRestarted() + public void shouldFailToCommitWhenServerIsRestarted() { Transaction tx = await( session.beginTransactionAsync() ); + await( tx.runAsync( "CREATE ()" ) ); + neo4j.killDb(); try { - await( tx.runAsync( "CREATE ()" ) ); await( tx.commitAsync() ); fail( "Exception expected" ); } @@ -806,7 +807,7 @@ public void shouldFailToCommitAfterTermination() } catch ( ClientException e ) { - assertEquals( "Can't commit, transaction has been terminated by `Session#reset()`", e.getMessage() ); + assertEquals( "Can't commit, transaction has been terminated", e.getMessage() ); } assertFalse( tx.isOpen() ); } @@ -924,8 +925,7 @@ public void shouldFailToRunQueryWhenTerminated() } catch ( ClientException e ) { - assertEquals( "Cannot run more statements in this transaction, it has been terminated by `Session#reset()`", - e.getMessage() ); + assertEquals( "Cannot run more statements in this transaction, it has been terminated", e.getMessage() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index b4da5c71da..f3653fab24 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.v1.integration; +import io.netty.channel.Channel; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -27,6 +28,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; @@ -39,12 +45,14 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.retry.RetrySettings.DEFAULT; public class TransactionIT { @@ -249,7 +257,7 @@ public void shouldHandleResetBeforeRun() throws Throwable { // Expect exception.expect( ClientException.class ); - exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated by" ); + exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" ); // When Transaction tx = session.beginTransaction(); session.reset(); @@ -391,7 +399,7 @@ public void shouldPropagateFailureFromSummary() @Test public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exception { - try ( Session otherSession = this.session.driver().session() ) + try ( Session otherSession = session.driver().session() ) { session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); @@ -425,7 +433,7 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForResult() throws Exc @Test public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exception { - try ( Session otherSession = this.session.driver().session() ) + try ( Session otherSession = session.driver().session() ) { session.run( "CREATE (:Person {name: 'Beta Ray Bill'})" ).consume(); @@ -458,4 +466,52 @@ public void shouldBeResponsiveToThreadInterruptWhenWaitingForCommit() throws Exc } } } + + @Test + public void shouldThrowWhenConnectionKilledDuringTransaction() + { + testFailWhenConnectionKilledDuringTransaction( false ); + } + + @Test + public void shouldThrowWhenConnectionKilledDuringTransactionMarkedForSuccess() + { + testFailWhenConnectionKilledDuringTransaction( true ); + } + + private void testFailWhenConnectionKilledDuringTransaction( boolean markForSuccess ) + { + ChannelTrackingDriverFactory factory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM ); + RoutingSettings instance = new RoutingSettings( 1, 0 ); + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).toConfig(); + + try ( Driver driver = factory.newInstance( session.uri(), session.authToken(), instance, DEFAULT, config ) ) + { + try ( Session session = driver.session(); + Transaction tx = session.beginTransaction() ) + { + tx.run( "CREATE (:MyNode {id: 1})" ).consume(); + + if ( markForSuccess ) + { + tx.success(); + } + + // kill all network channels + for ( Channel channel : factory.channels() ) + { + channel.close().syncUninterruptibly(); + } + + tx.run( "CREATE (:MyNode {id: 1})" ).consume(); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertThat( e.getMessage(), containsString( "Connection to the database terminated" ) ); + } + } + + assertEquals( 0, session.run( "MATCH (n:MyNode {id: 1}) RETURN count(n)" ).single().get( 0 ).asInt() ); + } } From a150643c94a933986e9a1da9872d92a8085bc6ef Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 9 Jan 2018 14:29:36 +0100 Subject: [PATCH 2/5] Simplified state check when closing transaction New condition is the same as old one by reads a bit better. --- .../java/org/neo4j/driver/internal/ExplicitTransaction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 71c66d2d98..1f33de988a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -158,7 +158,7 @@ CompletionStage closeAsync() { return commitAsync(); } - else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED ) + else if ( state != State.COMMITTED && state != State.ROLLED_BACK ) { return rollbackAsync(); } From 8351b4f5009999bdd97bd527e8a6e09e644d93a3 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 11 Jan 2018 13:55:57 +0100 Subject: [PATCH 3/5] Correctly prohibit multiple transactions in session Session can only have one active (not closed) transaction at a time. Exception is thrown when starting a transaction on a session that already has an open one. Situation when transaction is started multiple times on a session with an open transaction was not handled correctly. First failure overrode the reference to the last transaction kept in the session. Later attempts resulted in a different error saying that there already exists an open connection. This commit fixes the problem by making session keep current transaction reference even if new transaction failed to begin. So callers will receive consistent errors. --- .../neo4j/driver/internal/NetworkSession.java | 22 ++++++++- .../driver/internal/NetworkSessionTest.java | 47 +++++++++++++++++++ .../driver/v1/integration/SessionIT.java | 28 +++++++++++ 3 files changed, 95 insertions(+), 2 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index d5250e3848..802ffef7b4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -46,6 +46,7 @@ import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.types.TypeSystem; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.v1.Values.value; @@ -436,7 +437,8 @@ private CompletionStage beginTransactionAsync( AccessMode m { ensureSessionIsOpen(); - transactionStage = ensureNoOpenTxBeforeStartingTx() + // create a chain that acquires connection and starts a transaction + CompletionStage newTransactionStage = ensureNoOpenTxBeforeStartingTx() .thenCompose( ignore -> acquireConnection( mode ) ) .thenCompose( connection -> { @@ -444,7 +446,23 @@ private CompletionStage beginTransactionAsync( AccessMode m return tx.beginAsync( bookmark ); } ); - return transactionStage; + // update the reference to the only known transaction + CompletionStage currentTransactionStage = transactionStage; + + transactionStage = newTransactionStage + .exceptionally( error -> null ) // ignore errors from starting new transaction + .thenCompose( tx -> + { + if ( tx == null ) + { + // failed to begin new transaction, keep reference to the existing one + return currentTransactionStage; + } + // new transaction started, keep reference to it + return completedFuture( tx ); + } ); + + return newTransactionStage; } private CompletionStage acquireConnection( AccessMode mode ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 1f5bf6ba63..560c0635d2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -681,6 +681,53 @@ public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() verify( connection ).release(); } + @Test + public void shouldNotAllowStartingMultipleTransactions() + { + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + assertNotNull( tx ); + + for ( int i = 0; i < 5; i++ ) + { + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + } + } + + @Test + public void shouldAllowStartingTransactionAfterCurrentOneIsClosed() + { + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + assertNotNull( tx ); + + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + + tx.close(); + + assertNotNull( session.beginTransaction() ); + } + private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode ) { NetworkSession session = newSession( connectionProvider, sessionMode ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 76c6b650f3..544e9ba172 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -1623,6 +1623,34 @@ public void shouldConsumeWithFailure() } } + @Test + public void shouldNotAllowStartingMultipleTransactions() + { + try ( Session session = neo4j.driver().session() ) + { + Transaction tx = session.beginTransaction(); + assertNotNull( tx ); + + for ( int i = 0; i < 3; i++ ) + { + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + } + + tx.close(); + + assertNotNull( session.beginTransaction() ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); From b35d44c61e145114e07e71d09751086602b6c2c0 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 11 Jan 2018 17:32:07 +0100 Subject: [PATCH 4/5] Make `Session#reset()` only send RESET message Instead of sending RESET message and returning connection back to the pool. Queries & transactions should still be in control of the connection lifecycle even after reset. Previous behaviour was problematic because reset might race with query execution and make it try to use connection that has been returned to the pool. Also moved all RESET tests to a dedicated `SessionResetIT`. Made explicit transaction remain open after termination so that user code still needs to close it to return connection to the pool. --- .../driver/internal/ExplicitTransaction.java | 22 +- .../neo4j/driver/internal/NetworkSession.java | 25 +- .../internal/async/NettyConnection.java | 32 +- .../internal/async/RoutingConnection.java | 6 + .../inbound/InboundMessageDispatcher.java | 21 +- .../ChannelReleasingResetResponseHandler.java | 55 ++ .../handlers/ResetResponseHandler.java | 40 +- .../neo4j/driver/internal/spi/Connection.java | 2 + .../internal/ExplicitTransactionTest.java | 23 +- .../driver/internal/NetworkSessionTest.java | 31 +- .../async/ChannelConnectorImplTest.java | 5 +- .../internal/async/NettyConnectionTest.java | 150 ++- .../inbound/InboundMessageDispatcherTest.java | 71 +- ...nnelReleasingResetResponseHandlerTest.java | 130 +++ .../handlers/ResetResponseHandlerTest.java | 100 +- .../util/FailingConnectionDriverFactory.java | 6 + .../driver/v1/integration/SessionIT.java | 429 -------- .../driver/v1/integration/SessionResetIT.java | 922 ++++++++++++++++++ .../v1/integration/TransactionAsyncIT.java | 27 +- .../driver/v1/integration/TransactionIT.java | 94 -- .../org/neo4j/driver/v1/util/TestUtil.java | 53 +- 21 files changed, 1533 insertions(+), 711 deletions(-) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 1f33de988a..00740f0177 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -59,32 +59,25 @@ public class ExplicitTransaction implements Transaction private enum State { /** The transaction is running with no explicit success or failure marked */ - ACTIVE( true ), + ACTIVE, /** Running, user marked for success, meaning it'll value committed */ - MARKED_SUCCESS( true ), + MARKED_SUCCESS, /** User marked as failed, meaning it'll be rolled back. */ - MARKED_FAILED( true ), + MARKED_FAILED, /** * This transaction has been terminated either because of explicit {@link Session#reset()} or because of a * fatal connection error. */ - TERMINATED( false ), + TERMINATED, /** This transaction has successfully committed */ - COMMITTED( false ), + COMMITTED, /** This transaction has been rolled back */ - ROLLED_BACK( false ); - - final boolean txOpen; - - State( boolean txOpen ) - { - this.txOpen = txOpen; - } + ROLLED_BACK } private final Connection connection; @@ -181,6 +174,7 @@ else if ( state == State.ROLLED_BACK ) } else if ( state == State.TERMINATED ) { + transactionClosed( State.ROLLED_BACK ); return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) ); } else @@ -320,7 +314,7 @@ else if ( state == State.TERMINATED ) @Override public boolean isOpen() { - return state.txOpen; + return state != State.COMMITTED && state != State.ROLLED_BACK; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 802ffef7b4..1a90f3d5d7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -258,13 +258,24 @@ public void reset() private CompletionStage resetAsync() { - return existingTransactionOrNull().thenAccept( tx -> - { - if ( tx != null ) - { - tx.markTerminated(); - } - } ).thenCompose( ignore -> releaseConnection() ); + return existingTransactionOrNull() + .thenAccept( tx -> + { + if ( tx != null ) + { + tx.markTerminated(); + } + } ) + .thenCompose( ignore -> connectionStage ) + .thenCompose( connection -> + { + if ( connection != null ) + { + // there exists an active connection, send a RESET message over it + return connection.reset(); + } + return completedWithNull(); + } ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java index cf3d2b2211..3d3bb0c9e6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java @@ -28,6 +28,7 @@ import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler; import org.neo4j.driver.internal.handlers.ResetResponseHandler; import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.PullAllMessage; @@ -39,6 +40,7 @@ import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.Value; +import static java.util.Collections.emptyMap; import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason; public class NettyConnection implements Connection @@ -108,15 +110,24 @@ public void runAndFlush( String statement, Map parameters, Respons } } + @Override + public CompletionStage reset() + { + CompletableFuture result = new CompletableFuture<>(); + ResetResponseHandler handler = new ResetResponseHandler( messageDispatcher, result ); + writeResetMessageIfNeeded( handler, true ); + return result; + } + @Override public CompletionStage release() { if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) ) { - // auto-read could've been disabled, re-enable it to automatically receive response for RESET - setAutoRead( true ); + ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel, + channelPool, messageDispatcher, clock, releaseFuture ); - reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) ); + writeResetMessageIfNeeded( handler, false ); } return releaseFuture; } @@ -152,12 +163,21 @@ private void run( String statement, Map parameters, ResponseHandle pullAllHandler, flush ); } - private void reset( ResponseHandler resetHandler ) + private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean isSessionReset ) { channel.eventLoop().execute( () -> { - messageDispatcher.muteAckFailure(); - writeAndFlushMessage( ResetMessage.RESET, resetHandler ); + if ( isSessionReset && !isOpen() ) + { + resetHandler.onSuccess( emptyMap() ); + } + else + { + messageDispatcher.muteAckFailure(); + // auto-read could've been disabled, re-enable it to automatically receive response for RESET + setAutoRead( true ); + writeAndFlushMessage( ResetMessage.RESET, resetHandler ); + } } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java index 264a4a61ad..b1f35ab26e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java @@ -71,6 +71,12 @@ public void runAndFlush( String statement, Map parameters, Respons newRoutingResponseHandler( pullAllHandler ) ); } + @Override + public CompletionStage reset() + { + return delegate.reset(); + } + @Override public boolean isOpen() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java index 88df51a64b..a0571c6dad 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java @@ -33,6 +33,7 @@ import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE; @@ -157,14 +158,24 @@ public void handleIgnoredMessage() log.debug( "S: IGNORED" ); ResponseHandler handler = handlers.remove(); + + Throwable error; if ( currentError != null ) { - handler.onFailure( currentError ); + error = currentError; + } + else if ( ackFailureMuted ) + { + error = new ClientException( "Database ignored the request because session has been reset" ); } else { - log.warn( "Received IGNORED message for handler %s but error is missing", handler ); + log.warn( "Received IGNORED message for handler %s but error is missing and RESET is not in progress. " + + "Current handlers %s", handler, handlers ); + + error = new ClientException( "Database ignored the request" ); } + handler.onFailure( error ); } public void handleFatalError( Throwable error ) @@ -212,15 +223,9 @@ public void muteAckFailure() * {@link #muteAckFailure()} when sending RESET message. *

* This method is not thread-safe and should only be executed by the event loop thread. - * - * @throws IllegalStateException if ACK_FAILURE is not muted right now. */ public void unMuteAckFailure() { - if ( !ackFailureMuted ) - { - throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" ); - } ackFailureMuted = false; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java new file mode 100644 index 0000000000..5b2611240f --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import io.netty.channel.Channel; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.Future; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.util.Clock; + +import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; + +public class ChannelReleasingResetResponseHandler extends ResetResponseHandler +{ + private final Channel channel; + private final ChannelPool pool; + private final Clock clock; + + public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool, + InboundMessageDispatcher messageDispatcher, Clock clock, CompletableFuture releaseFuture ) + { + super( messageDispatcher, releaseFuture ); + this.channel = channel; + this.pool = pool; + this.clock = clock; + } + + @Override + protected void resetCompleted( CompletableFuture completionFuture ) + { + setLastUsedTimestamp( channel, clock.millis() ); + + Future released = pool.release( channel ); + released.addListener( ignore -> completionFuture.complete( null ) ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java index bc57e3f8c4..ad01eb85db 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java @@ -18,62 +18,50 @@ */ package org.neo4j.driver.internal.handlers; -import io.netty.channel.Channel; -import io.netty.channel.pool.ChannelPool; -import io.netty.util.concurrent.Future; - import java.util.Map; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.internal.spi.ResponseHandler; -import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.Value; -import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp; - public class ResetResponseHandler implements ResponseHandler { - private final Channel channel; - private final ChannelPool pool; private final InboundMessageDispatcher messageDispatcher; - private final Clock clock; - private final CompletableFuture releaseFuture; + private final CompletableFuture completionFuture; - public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher, - Clock clock, CompletableFuture releaseFuture ) + public ResetResponseHandler( InboundMessageDispatcher messageDispatcher, CompletableFuture completionFuture ) { - this.channel = channel; - this.pool = pool; this.messageDispatcher = messageDispatcher; - this.clock = clock; - this.releaseFuture = releaseFuture; + this.completionFuture = completionFuture; } @Override - public void onSuccess( Map metadata ) + public final void onSuccess( Map metadata ) { - releaseChannel(); + resetCompleted(); } @Override - public void onFailure( Throwable error ) + public final void onFailure( Throwable error ) { - releaseChannel(); + resetCompleted(); } @Override - public void onRecord( Value[] fields ) + public final void onRecord( Value[] fields ) { throw new UnsupportedOperationException(); } - private void releaseChannel() + private void resetCompleted() { messageDispatcher.unMuteAckFailure(); - setLastUsedTimestamp( channel, clock.millis() ); + resetCompleted( completionFuture ); + } - Future released = pool.release( channel ); - released.addListener( ignore -> releaseFuture.complete( null ) ); + protected void resetCompleted( CompletableFuture completionFuture ) + { + completionFuture.complete( null ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index adbb83bb0f..d537f54360 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -39,6 +39,8 @@ void run( String statement, Map parameters, ResponseHandler runHan void runAndFlush( String statement, Map parameters, ResponseHandler runHandler, ResponseHandler pullAllHandler ); + CompletionStage reset(); + CompletionStage release(); void terminateAndRelease( String reason ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index c1366eac51..4c3e42bbe6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -26,6 +26,7 @@ import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; import static java.util.Collections.emptyMap; import static org.junit.Assert.assertEquals; @@ -156,7 +157,7 @@ public void shouldBeClosedWhenMarkedAsTerminated() tx.markTerminated(); - assertFalse( tx.isOpen() ); + assertTrue( tx.isOpen() ); } @Test @@ -256,6 +257,26 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds() verify( connection, never() ).release(); } + @Test + public void shouldReleaseConnectionWhenTerminatedAndCommitted() + { + Connection connection = connectionMock(); + ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) ); + + tx.markTerminated(); + try + { + await( tx.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException ignore ) + { + } + + assertFalse( tx.isOpen() ); + verify( connection ).release(); + } + @Test public void shouldReleaseConnectionWhenTerminatedAndRolledBack() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 560c0635d2..f1a4681760 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -51,7 +51,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -91,6 +90,7 @@ public void setUp() { connection = connectionMock(); when( connection.release() ).thenReturn( completedWithNull() ); + when( connection.reset() ).thenReturn( completedWithNull() ); when( connection.serverAddress() ).thenReturn( BoltServerAddress.LOCAL_DEFAULT ); when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 ); connectionProvider = mock( ConnectionProvider.class ); @@ -487,27 +487,17 @@ public void writeTxRetriedUntilFailureWhenTxCloseThrows() } @Test - public void connectionShouldBeReleasedAfterSessionReset() + public void connectionShouldBeResetAfterSessionReset() { NetworkSession session = newSession( connectionProvider, READ ); session.run( "RETURN 1" ); + verify( connection, never() ).reset(); verify( connection, never() ).release(); session.reset(); - verify( connection ).release(); - } - - @Test - public void transactionShouldBeRolledBackAfterSessionReset() - { - NetworkSession session = newSession( connectionProvider, READ ); - Transaction tx = session.beginTransaction(); - - assertTrue( tx.isOpen() ); - - session.reset(); - assertFalse( tx.isOpen() ); + verify( connection ).reset(); + verify( connection, never() ).release(); } @Test @@ -663,22 +653,17 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() } @Test - public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset() + public void shouldMarkTransactionAsTerminatedAndThenResetConnectionOnReset() { NetworkSession session = newSession( connectionProvider, READ ); Transaction tx = session.beginTransaction(); assertTrue( tx.isOpen() ); - when( connection.release() ).then( invocation -> - { - // verify that tx is not open when connection is released - assertFalse( tx.isOpen() ); - return completedWithNull(); - } ); + verify( connection, never() ).reset(); session.reset(); - verify( connection ).release(); + verify( connection ).reset(); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java index 16d7a39003..883ee0e37b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectorImplTest.java @@ -27,8 +27,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.RuleChain; -import org.junit.rules.Timeout; import java.io.IOException; import java.net.ConnectException; @@ -61,9 +59,8 @@ public class ChannelConnectorImplTest { - private final TestNeo4j neo4j = new TestNeo4j(); @Rule - public final RuleChain ruleChain = RuleChain.outerRule( Timeout.seconds( 60 ) ).around( neo4j ); + public final TestNeo4j neo4j = new TestNeo4j(); private Bootstrap bootstrap; diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java index 471b9592cc..b96f39b6e8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/NettyConnectionTest.java @@ -29,6 +29,7 @@ import org.mockito.ArgumentCaptor; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,6 +55,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.setMessageDispatcher; import static org.neo4j.driver.internal.async.ChannelAttributes.terminationReason; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; @@ -77,14 +79,14 @@ public void tearDown() throws Exception @Test public void shouldBeOpenAfterCreated() { - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); assertTrue( connection.isOpen() ); } @Test public void shouldNotBeOpenAfterRelease() { - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.release(); assertFalse( connection.isOpen() ); } @@ -92,10 +94,7 @@ public void shouldNotBeOpenAfterRelease() @Test public void shouldSendResetOnRelease() { - EmbeddedChannel channel = new EmbeddedChannel(); - InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); - ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); - + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); connection.release(); @@ -128,19 +127,21 @@ public void shouldWriteForceReleaseInEventLoopThread() throws Exception @Test public void shouldEnableAutoReadWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( false ); NettyConnection connection = newConnection( channel ); connection.release(); + channel.runPendingTasks(); + assertTrue( channel.config().isAutoRead() ); } @Test public void shouldNotDisableAutoReadWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( true ); NettyConnection connection = newConnection( channel ); @@ -155,7 +156,7 @@ public void shouldNotRunWhenReleased() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.release(); connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -170,7 +171,7 @@ public void shouldNotRunAndFlushWhenReleased() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.release(); connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -185,7 +186,7 @@ public void shouldNotRunWhenTerminated() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.terminateAndRelease( "42" ); connection.run( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -200,7 +201,7 @@ public void shouldNotRunAndFlushWhenTerminated() { ResponseHandler runHandler = mock( ResponseHandler.class ); ResponseHandler pullAllHandler = mock( ResponseHandler.class ); - NettyConnection connection = newConnection( new EmbeddedChannel() ); + NettyConnection connection = newConnection( newChannel() ); connection.terminateAndRelease( "42" ); connection.runAndFlush( "RETURN 1", emptyMap(), runHandler, pullAllHandler ); @@ -213,7 +214,7 @@ public void shouldNotRunAndFlushWhenTerminated() @Test public void shouldReturnServerAddressWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); BoltServerAddress address = new BoltServerAddress( "host", 4242 ); ChannelAttributes.setServerAddress( channel, address ); @@ -226,7 +227,7 @@ public void shouldReturnServerAddressWhenReleased() @Test public void shouldReturnServerVersionWhenReleased() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ServerVersion version = ServerVersion.v3_2_0; ChannelAttributes.setServerVersion( channel, version ); @@ -239,10 +240,7 @@ public void shouldReturnServerVersionWhenReleased() @Test public void shouldReturnSameCompletionStageFromRelease() { - EmbeddedChannel channel = new EmbeddedChannel(); - InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); - ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); - + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); CompletionStage releaseStage1 = connection.release(); @@ -263,7 +261,7 @@ public void shouldReturnSameCompletionStageFromRelease() @Test public void shouldEnableAutoRead() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( false ); NettyConnection connection = newConnection( channel ); @@ -275,7 +273,7 @@ public void shouldEnableAutoRead() @Test public void shouldDisableAutoRead() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); channel.config().setAutoRead( true ); NettyConnection connection = newConnection( channel ); @@ -287,7 +285,7 @@ public void shouldDisableAutoRead() @Test public void shouldSetTerminationReasonOnChannelWhenTerminated() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); String reason = "Something really bad has happened"; @@ -299,7 +297,7 @@ public void shouldSetTerminationReasonOnChannelWhenTerminated() @Test public void shouldCloseChannelWhenTerminated() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); NettyConnection connection = newConnection( channel ); assertTrue( channel.isActive() ); @@ -311,7 +309,7 @@ public void shouldCloseChannelWhenTerminated() @Test public void shouldReleaseChannelWhenTerminated() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ChannelPool pool = mock( ChannelPool.class ); NettyConnection connection = newConnection( channel, pool ); verify( pool, never() ).release( any() ); @@ -324,7 +322,7 @@ public void shouldReleaseChannelWhenTerminated() @Test public void shouldNotReleaseChannelMultipleTimesWhenTerminatedMultipleTimes() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ChannelPool pool = mock( ChannelPool.class ); NettyConnection connection = newConnection( channel, pool ); verify( pool, never() ).release( any() ); @@ -342,7 +340,7 @@ public void shouldNotReleaseChannelMultipleTimesWhenTerminatedMultipleTimes() @Test public void shouldNotReleaseAfterTermination() { - EmbeddedChannel channel = new EmbeddedChannel(); + EmbeddedChannel channel = newChannel(); ChannelPool pool = mock( ChannelPool.class ); NettyConnection connection = newConnection( channel, pool ); verify( pool, never() ).release( any() ); @@ -356,6 +354,93 @@ public void shouldNotReleaseAfterTermination() verify( pool ).release( channel ); } + @Test + public void shouldSendResetMessageWhenReset() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + connection.reset(); + channel.runPendingTasks(); + + assertEquals( 1, channel.outboundMessages().size() ); + assertEquals( RESET, channel.readOutbound() ); + } + + @Test + public void shouldCompleteResetFutureWhenSuccessResponseArrives() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + CompletableFuture resetFuture = connection.reset().toCompletableFuture(); + channel.runPendingTasks(); + assertFalse( resetFuture.isDone() ); + + messageDispatcher( channel ).handleSuccessMessage( emptyMap() ); + assertTrue( resetFuture.isDone() ); + assertFalse( resetFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldCompleteResetFutureWhenFailureResponseArrives() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + CompletableFuture resetFuture = connection.reset().toCompletableFuture(); + channel.runPendingTasks(); + assertFalse( resetFuture.isDone() ); + + messageDispatcher( channel ).handleFailureMessage( "Neo.TransientError.Transaction.Terminated", "Message" ); + assertTrue( resetFuture.isDone() ); + assertFalse( resetFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldDoNothingInResetWhenClosed() + { + EmbeddedChannel channel = newChannel(); + NettyConnection connection = newConnection( channel ); + + connection.release(); + channel.runPendingTasks(); + + CompletableFuture resetFuture = connection.reset().toCompletableFuture(); + channel.runPendingTasks(); + + assertEquals( 1, channel.outboundMessages().size() ); + assertEquals( RESET, channel.readOutbound() ); + assertTrue( resetFuture.isDone() ); + assertFalse( resetFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldMuteAckFailureWhenReset() + { + InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + EmbeddedChannel channel = newChannel( messageDispatcher ); + NettyConnection connection = newConnection( channel ); + + connection.reset(); + channel.runPendingTasks(); + + verify( messageDispatcher ).muteAckFailure(); + } + + @Test + public void shouldEnableAutoReadWhenDoingReset() + { + EmbeddedChannel channel = newChannel(); + channel.config().setAutoRead( false ); + NettyConnection connection = newConnection( channel ); + + connection.reset(); + channel.runPendingTasks(); + + assertTrue( channel.config().isAutoRead() ); + } + private void testWriteInEventLoop( String threadName, Consumer action ) throws Exception { EmbeddedChannel channel = spy( new EmbeddedChannel() ); @@ -390,6 +475,21 @@ private void shutdownEventLoop() throws Exception } } + private static EmbeddedChannel newChannel() + { + EmbeddedChannel channel = new EmbeddedChannel(); + InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ); + ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + return channel; + } + + private static EmbeddedChannel newChannel( InboundMessageDispatcher messageDispatcher ) + { + EmbeddedChannel channel = new EmbeddedChannel(); + ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + return channel; + } + private static NettyConnection newConnection( Channel channel ) { return newConnection( channel, mock( ChannelPool.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java index ca55f85006..104d72be3a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java @@ -30,8 +30,10 @@ import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.Neo4jException; +import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -148,19 +150,18 @@ public void shouldNotSendAckFailureOnFailureWhenMuted() } @Test - public void shouldFailToUnMuteAckFailureWhenNotMuted() + public void shouldUnMuteAckFailureWhenNotMuted() { - InboundMessageDispatcher dispatcher = newDispatcher( mock( Channel.class ) ); + Channel channel = mock( Channel.class ); + InboundMessageDispatcher dispatcher = newDispatcher( channel ); - try - { - dispatcher.unMuteAckFailure(); - fail( "Exception expected" ); - } - catch ( IllegalStateException e ) - { - assertEquals( "Can't un-mute ACK_FAILURE because it's not muted", e.getMessage() ); - } + dispatcher.unMuteAckFailure(); + + dispatcher.queue( mock( ResponseHandler.class ) ); + assertEquals( 1, dispatcher.queuedHandlersCount() ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + verify( channel ).writeAndFlush( eq( ACK_FAILURE ), any() ); } @Test @@ -268,7 +269,49 @@ public void shouldDequeHandlerOnIgnored() dispatcher.handleIgnoredMessage(); assertEquals( 0, dispatcher.queuedHandlersCount() ); - verifyZeroInteractions( handler ); + } + + @Test + public void shouldFailHandlerOnIgnoredMessageWithExistingError() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + ResponseHandler handler1 = mock( ResponseHandler.class ); + ResponseHandler handler2 = mock( ResponseHandler.class ); + + dispatcher.queue( handler1 ); + dispatcher.queue( handler2 ); + + dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE ); + verifyFailure( handler1 ); + verifyZeroInteractions( handler2 ); + + dispatcher.handleIgnoredMessage(); + verifyFailure( handler2 ); + } + + @Test + public void shouldFailHandlerOnIgnoredMessageWhenHandlingReset() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + ResponseHandler handler = mock( ResponseHandler.class ); + dispatcher.queue( handler ); + + dispatcher.muteAckFailure(); + dispatcher.handleIgnoredMessage(); + + verify( handler ).onFailure( any( ClientException.class ) ); + } + + @Test + public void shouldFailHandlerOnIgnoredMessageWhenNoErrorAndNotHandlingReset() + { + InboundMessageDispatcher dispatcher = newDispatcher(); + ResponseHandler handler = mock( ResponseHandler.class ); + dispatcher.queue( handler ); + + dispatcher.handleIgnoredMessage(); + + verify( handler ).onFailure( any( ClientException.class ) ); } @Test @@ -296,7 +339,7 @@ public void shouldNotSupportInitMessage() try { - dispatcher.handleInitMessage( "Client", Collections.emptyMap() ); + dispatcher.handleInitMessage( "Client", emptyMap() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -312,7 +355,7 @@ public void shouldNotSupportRunMessage() try { - dispatcher.handleRunMessage( "RETURN 1", Collections.emptyMap() ); + dispatcher.handleRunMessage( "RETURN 1", emptyMap() ); fail( "Exception expected" ); } catch ( Exception e ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java new file mode 100644 index 0000000000..1d2ef5001d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ChannelReleasingResetResponseHandlerTest.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.junit.After; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.FakeClock; + +import static java.util.Collections.emptyMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; + +public class ChannelReleasingResetResponseHandlerTest +{ + private final EmbeddedChannel channel = new EmbeddedChannel(); + private final InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + + @After + public void tearDown() + { + channel.finishAndReleaseAll(); + } + + @Test + public void shouldReleaseChannelOnSuccess() + { + ChannelPool pool = newChannelPoolMock(); + FakeClock clock = new FakeClock(); + clock.progress( 5 ); + CompletableFuture releaseFuture = new CompletableFuture<>(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + + handler.onSuccess( emptyMap() ); + + verifyLastUsedTimestamp( 5 ); + verify( pool ).release( eq( channel ) ); + assertTrue( releaseFuture.isDone() ); + assertFalse( releaseFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldReleaseChannelOnFailure() + { + ChannelPool pool = newChannelPoolMock(); + FakeClock clock = new FakeClock(); + clock.progress( 100 ); + CompletableFuture releaseFuture = new CompletableFuture<>(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + + handler.onFailure( new RuntimeException() ); + + verifyLastUsedTimestamp( 100 ); + verify( pool ).release( eq( channel ) ); + assertTrue( releaseFuture.isDone() ); + assertFalse( releaseFuture.isCompletedExceptionally() ); + } + + @Test + public void shouldUnMuteAckFailureOnSuccess() + { + ChannelPool pool = newChannelPoolMock(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + + handler.onSuccess( emptyMap() ); + + verify( messageDispatcher ).unMuteAckFailure(); + } + + @Test + public void shouldUnMuteAckFailureOnFailure() + { + ChannelPool pool = newChannelPoolMock(); + ChannelReleasingResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + + handler.onFailure( new RuntimeException() ); + + verify( messageDispatcher ).unMuteAckFailure(); + } + + private void verifyLastUsedTimestamp( int expectedValue ) + { + assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); + } + + private ChannelReleasingResetResponseHandler newHandler( ChannelPool pool, Clock clock, + CompletableFuture releaseFuture ) + { + return new ChannelReleasingResetResponseHandler( channel, pool, messageDispatcher, clock, releaseFuture ); + } + + private static ChannelPool newChannelPoolMock() + { + ChannelPool pool = mock( ChannelPool.class ); + Future releasedFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture( null ); + when( pool.release( any() ) ).thenReturn( releasedFuture ); + return pool; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java index 3e87ab1376..92837f4b88 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/ResetResponseHandlerTest.java @@ -18,112 +18,96 @@ */ package org.neo4j.driver.internal.handlers; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.channel.pool.ChannelPool; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; -import org.junit.After; import org.junit.Test; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.internal.util.FakeClock; import static java.util.Collections.emptyMap; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.neo4j.driver.internal.async.ChannelAttributes.lastUsedTimestamp; +import static org.neo4j.driver.v1.Values.values; public class ResetResponseHandlerTest { - private final EmbeddedChannel channel = new EmbeddedChannel(); - private final InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); - - @After - public void tearDown() - { - channel.finishAndReleaseAll(); - } - @Test - public void shouldReleaseChannelOnSuccess() + public void shouldCompleteFutureOnSuccess() throws Exception { - ChannelPool pool = newChannelPoolMock(); - FakeClock clock = new FakeClock(); - clock.progress( 5 ); - CompletableFuture releaseFuture = new CompletableFuture<>(); - ResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + CompletableFuture future = new CompletableFuture<>(); + ResetResponseHandler handler = newHandler( future ); + + assertFalse( future.isDone() ); handler.onSuccess( emptyMap() ); - verifyLastUsedTimestamp( 5 ); - verify( pool ).release( eq( channel ) ); - assertTrue( releaseFuture.isDone() ); - assertFalse( releaseFuture.isCompletedExceptionally() ); + assertTrue( future.isDone() ); + assertNull( future.get() ); } @Test - public void shouldReleaseChannelOnFailure() + public void shouldUnMuteAckFailureOnSuccess() { - ChannelPool pool = newChannelPoolMock(); - FakeClock clock = new FakeClock(); - clock.progress( 100 ); - CompletableFuture releaseFuture = new CompletableFuture<>(); - ResetResponseHandler handler = newHandler( pool, clock, releaseFuture ); + InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + ResetResponseHandler handler = newHandler( messageDispatcher, new CompletableFuture<>() ); - handler.onFailure( new RuntimeException() ); + handler.onSuccess( emptyMap() ); - verifyLastUsedTimestamp( 100 ); - verify( pool ).release( eq( channel ) ); - assertTrue( releaseFuture.isDone() ); - assertFalse( releaseFuture.isCompletedExceptionally() ); + verify( messageDispatcher ).unMuteAckFailure(); } @Test - public void shouldUnMuteAckFailureOnSuccess() + public void shouldCompleteFutureOnFailure() throws Exception { - ChannelPool pool = newChannelPoolMock(); - ResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + CompletableFuture future = new CompletableFuture<>(); + ResetResponseHandler handler = newHandler( future ); - handler.onSuccess( emptyMap() ); + assertFalse( future.isDone() ); - verify( messageDispatcher ).unMuteAckFailure(); + handler.onFailure( new RuntimeException() ); + + assertTrue( future.isDone() ); + assertNull( future.get() ); } @Test public void shouldUnMuteAckFailureOnFailure() { - ChannelPool pool = newChannelPoolMock(); - ResetResponseHandler handler = newHandler( pool, new FakeClock(), new CompletableFuture<>() ); + InboundMessageDispatcher messageDispatcher = mock( InboundMessageDispatcher.class ); + ResetResponseHandler handler = newHandler( messageDispatcher, new CompletableFuture<>() ); handler.onFailure( new RuntimeException() ); verify( messageDispatcher ).unMuteAckFailure(); } - private void verifyLastUsedTimestamp( int expectedValue ) + @Test + public void shouldThrowWhenOnRecord() { - assertEquals( expectedValue, lastUsedTimestamp( channel ).intValue() ); + ResetResponseHandler handler = newHandler( new CompletableFuture<>() ); + + try + { + handler.onRecord( values( 1, 2, 3 ) ); + fail( "Exception expected" ); + } + catch ( UnsupportedOperationException ignore ) + { + } } - private ResetResponseHandler newHandler( ChannelPool pool, Clock clock, CompletableFuture releaseFuture ) + private static ResetResponseHandler newHandler( CompletableFuture future ) { - return new ResetResponseHandler( channel, pool, messageDispatcher, clock, releaseFuture ); + return new ResetResponseHandler( mock( InboundMessageDispatcher.class ), future ); } - private static ChannelPool newChannelPoolMock() + private static ResetResponseHandler newHandler( InboundMessageDispatcher messageDispatcher, + CompletableFuture future ) { - ChannelPool pool = mock( ChannelPool.class ); - Future releasedFuture = ImmediateEventExecutor.INSTANCE.newSucceededFuture( null ); - when( pool.release( any() ) ).thenReturn( releasedFuture ); - return pool; + return new ResetResponseHandler( messageDispatcher, future ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java index 559b3aa101..cb7ec6cba2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FailingConnectionDriverFactory.java @@ -140,6 +140,12 @@ public void runAndFlush( String statement, Map parameters, Respons delegate.runAndFlush( statement, parameters, runHandler, pullAllHandler ); } + @Override + public CompletionStage reset() + { + return delegate.reset(); + } + @Override public CompletionStage release() { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 544e9ba172..4b45abc90e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -18,7 +18,6 @@ */ package org.neo4j.driver.v1.integration; -import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -38,7 +37,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingContext; @@ -60,7 +58,6 @@ import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.exceptions.AuthenticationException; import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.summary.ResultSummary; @@ -71,13 +68,9 @@ import static java.lang.String.format; import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyArray; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; @@ -165,280 +158,6 @@ public void shouldHandleNullAuthToken() throws Throwable driver = GraphDatabase.driver( neo4j.uri(), token ); } - @Test - public void shouldKillLongRunningStatement() throws Throwable - { - neo4j.ensureProcedures( "longRunningStatement.jar" ); - // Given - int executionTimeout = 10; // 10s - final int killTimeout = 1; // 1s - long startTime = -1, endTime; - - try ( Session session = neo4j.driver().session() ) - { - StatementResult result = - session.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", executionTimeout ) ); - - resetSessionAfterTimeout( session, killTimeout ); - - // When - startTime = System.currentTimeMillis(); - result.consume();// blocking to run the statement - - fail( "Should have got an exception about statement get killed." ); - } - catch ( Neo4jException e ) - { - endTime = System.currentTimeMillis(); - assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset - assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished - } - catch ( Exception e ) - { - fail( "Should be a Neo4jException" ); - } - } - - @Test - public void shouldKillLongStreamingResult() throws Throwable - { - neo4j.ensureProcedures( "longRunningStatement.jar" ); - // Given - int executionTimeout = 10; // 10s - final int killTimeout = 1; // 1s - long startTime = -1, endTime; - int recordCount = 0; - - try ( final Session session = neo4j.driver().session() ) - { - StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", - parameters( "seconds", executionTimeout ) ); - - resetSessionAfterTimeout( session, killTimeout ); - - // When - startTime = System.currentTimeMillis(); - while ( result.hasNext() ) - { - result.next(); - recordCount++; - } - - fail( "Should have got an exception about streaming get killed." ); - } - catch ( ClientException e ) - { - endTime = System.currentTimeMillis(); - assertThat( e.code(), equalTo( "Neo.ClientError.Procedure.ProcedureCallFailed" ) ); - assertThat( recordCount, greaterThan( 1 ) ); - - assertTrue( startTime > 0 ); - assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset - assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable - { - // Given - neo4j.ensureProcedures( "longRunningStatement.jar" ); - - try ( Session session = neo4j.driver().session() ) - { - Transaction tx1 = session.beginTransaction(); - - tx1.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); - Thread.sleep( 1000 ); - session.reset(); - - // When - Transaction tx2 = session.beginTransaction(); - - // Then - assertThat( tx2, notNullValue() ); - - exception.expect( ClientException.class ); // errors differ depending of neo4j version - exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" ); - - tx1.run( "RETURN 1" ); - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable - { - // Given - neo4j.ensureProcedures( "longRunningStatement.jar" ); - - Session session = neo4j.driver().session(); - session.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); - Thread.sleep( 1000 ); - session.reset(); - - exception.expect( Neo4jException.class ); // errors differ depending of neo4j version - exception.expectMessage( containsString( "The transaction has been terminated" ) ); - - // When & Then - session.close(); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable - { - // Given - neo4j.ensureProcedures( "longRunningStatement.jar" ); - - try ( Session session = neo4j.driver().session() ) - { - Transaction tx = session.beginTransaction(); - - StatementResult procedureResult = tx.run( "CALL test.driver.longRunningStatement({seconds})", - parameters( "seconds", 10 ) ); - Thread.sleep( 1000 ); - session.reset(); - - try - { - procedureResult.consume(); - fail( "Should procedure throw an exception as we interrupted procedure call" ); - } - catch ( Neo4jException e ) - { - assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); - } - catch ( Throwable e ) - { - fail( "Expected exception is different from what we've received: " + e.getMessage() ); - } - - // When - tx = session.beginTransaction(); - tx.run( "CREATE (n:FirstNode)" ); - tx.success(); - tx.close(); - - // Then - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - MatcherAssert.assertThat( nodes, equalTo( 1L ) ); - } - } - - @SuppressWarnings( "deprecation" ) - private void resetSessionAfterTimeout( final Session session, final int timeout ) - { - new Thread( () -> - { - try - { - Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds - } - catch ( InterruptedException e ) - { - e.printStackTrace(); - } - finally - { - session.reset(); // reset the session after timeout - } - } ).start(); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowMoreStatementAfterSessionReset() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - - session.run( "Return 1" ).consume(); - - // When reset the state of this session - session.reset(); - - // Then can run successfully more statements without any error - session.run( "Return 2" ).consume(); - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowMoreTxAfterSessionReset() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - try ( Transaction tx = session.beginTransaction() ) - { - tx.run( "Return 1" ); - tx.success(); - } - - // When reset the state of this session - session.reset(); - - // Then can run more Tx - try ( Transaction tx = session.beginTransaction() ) - { - tx.run( "Return 2" ); - tx.success(); - } - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - try ( Transaction tx = session.beginTransaction() ) - { - // When reset the state of this session - session.reset(); - // Then - tx.run( "Return 1" ); - fail( "Should not allow tx run as tx is already failed." ); - } - catch ( Exception e ) - { - assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); - } - } - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldAllowMoreTxAfterSessionResetInTx() - { - // Given - try ( Session session = neo4j.driver().session() ) - { - try ( Transaction tx = session.beginTransaction() ) - { - // When reset the state of this session - session.reset(); - } - - // Then can run more Tx - try ( Transaction tx = session.beginTransaction() ) - { - tx.run( "Return 2" ); - tx.success(); - } - } - } - @Test public void executeReadTxInReadSession() { @@ -891,80 +610,6 @@ public void writeTxRolledBackWhenMarkedAsSuccessAndThrowsException() } } - @Test - public void resetShouldStopQueryWaitingForALock() throws Exception - { - assumeServerIs31OrLater(); - testResetOfQueryWaitingForLock( new NodeIdUpdater() - { - @Override - void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception - { - try ( Session session = driver.session() ) - { - usedSessionRef.set( session ); - latchToWait.await(); - StatementResult result = updateNodeId( session, nodeId, newNodeId ); - result.consume(); - } - } - } ); - } - - @Test - public void resetShouldStopTransactionWaitingForALock() throws Exception - { - assumeServerIs31OrLater(); - testResetOfQueryWaitingForLock( new NodeIdUpdater() - { - @Override - public void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception - { - try ( Session session = neo4j.driver().session(); - Transaction tx = session.beginTransaction() ) - { - usedSessionRef.set( session ); - latchToWait.await(); - StatementResult result = updateNodeId( tx, nodeId, newNodeId ); - result.consume(); - } - } - } ); - } - - @Test - public void resetShouldStopWriteTransactionWaitingForALock() throws Exception - { - assumeServerIs31OrLater(); - final AtomicInteger invocationsOfWork = new AtomicInteger(); - - testResetOfQueryWaitingForLock( new NodeIdUpdater() - { - @Override - public void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception - { - try ( Session session = driver.session() ) - { - usedSessionRef.set( session ); - latchToWait.await(); - - session.writeTransaction( tx -> - { - invocationsOfWork.incrementAndGet(); - StatementResult result = updateNodeId( tx, nodeId, newNodeId ); - result.consume(); - return null; - } ); - } - } - } ); - - assertEquals( 1, invocationsOfWork.get() ); - } - @Test public void transactionRunShouldFailOnDeadlocks() throws Exception { @@ -1651,13 +1296,6 @@ public void shouldNotAllowStartingMultipleTransactions() } } - private void assumeServerIs31OrLater() - { - ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); - assumeTrue( "Ignored on `" + serverVersion + "`", - serverVersion.greaterThanOrEqual( v3_1_0 ) ); - } - private void testExecuteReadTx( AccessMode sessionMode ) { Driver driver = neo4j.driver(); @@ -1746,43 +1384,6 @@ private void testTxRollbackWhenFunctionThrows( AccessMode sessionMode ) } - @SuppressWarnings( "deprecation" ) - private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception - { - int nodeId = 42; - int newNodeId1 = 4242; - int newNodeId2 = 424242; - - createNodeWithId( nodeId ); - - CountDownLatch nodeLocked = new CountDownLatch( 1 ); - AtomicReference otherSessionRef = new AtomicReference<>(); - - try ( Session session = neo4j.driver().session(); - Transaction tx = session.beginTransaction() ) - { - Future txResult = nodeIdUpdater.update( nodeId, newNodeId1, otherSessionRef, nodeLocked ); - - StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); - result.consume(); - tx.success(); - - nodeLocked.countDown(); - // give separate thread some time to block on a lock - Thread.sleep( 2_000 ); - otherSessionRef.get().reset(); - - assertTransactionTerminated( txResult ); - } - - try ( Session session = neo4j.driver().session() ) - { - StatementResult result = session.run( "MATCH (n) RETURN n.id AS id" ); - int value = result.single().get( "id" ).asInt(); - assertEquals( newNodeId2, value ); - } - } - private Driver newDriverWithoutRetries() { return newDriverWithFixedRetries( 0 ); @@ -1845,20 +1446,6 @@ private static StatementResult updateNodeId( StatementRunner statementRunner, in parameters( "currentId", currentId, "newId", newId ) ); } - private static void assertTransactionTerminated( Future work ) throws Exception - { - try - { - work.get( 20, TimeUnit.SECONDS ); - fail( "Exception expected" ); - } - catch ( ExecutionException e ) - { - assertThat( e.getCause(), instanceOf( TransientException.class ) ); - assertThat( e.getCause().getMessage(), startsWith( "The transaction has been terminated" ) ); - } - } - private static boolean assertOneOfTwoFuturesFailWithDeadlock( Future future1, Future future2 ) throws Exception { @@ -1914,22 +1501,6 @@ private static void await( CountDownLatch latch ) } } - private abstract class NodeIdUpdater - { - final Future update( int nodeId, int newNodeId, AtomicReference usedSessionRef, - CountDownLatch latchToWait ) - { - return executeInDifferentThread( () -> - { - performUpdate( neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait ); - return null; - } ); - } - - abstract void performUpdate( Driver driver, int nodeId, int newNodeId, - AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception; - } - private static class ThrowingWork implements TransactionWork { final String query; diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java new file mode 100644 index 0000000000..ff66de026d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionResetIT.java @@ -0,0 +1,922 @@ +/* + * Copyright (c) 2002-2018 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1.integration; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.channels.ClosedChannelException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.neo4j.driver.internal.util.ServerVersion; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.StatementRunner; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.Neo4jException; +import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.util.TestNeo4j; + +import static java.util.Collections.newSetFromMap; +import static java.util.concurrent.CompletableFuture.runAsync; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.IntStream.range; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.internal.util.ServerVersion.v3_1_0; +import static org.neo4j.driver.v1.Values.parameters; +import static org.neo4j.driver.v1.util.DaemonThreadFactory.daemon; +import static org.neo4j.driver.v1.util.Neo4jRunner.HOME_DIR; +import static org.neo4j.driver.v1.util.Neo4jSettings.IMPORT_DIR; +import static org.neo4j.driver.v1.util.Neo4jSettings.TEST_SETTINGS; +import static org.neo4j.driver.v1.util.TestUtil.activeQueryCount; +import static org.neo4j.driver.v1.util.TestUtil.activeQueryNames; +import static org.neo4j.driver.v1.util.TestUtil.awaitCondition; + +@SuppressWarnings( "deprecation" ) +public class SessionResetIT +{ + private static final int CSV_FILE_SIZE = 10_000; + private static final int LOAD_CSV_BATCH_SIZE = 10; + + private static final String SHORT_QUERY_1 = "CREATE (n:Node {name: 'foo', occupation: 'bar'})"; + private static final String SHORT_QUERY_2 = "MATCH (n:Node {name: 'foo'}) RETURN count(n)"; + private static final String LONG_QUERY = "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n"; + private static final String LONG_PERIODIC_COMMIT_QUERY_TEMPLATE = + "USING PERIODIC COMMIT 1 " + + "LOAD CSV FROM '%s' AS line " + + "UNWIND range(1, " + LOAD_CSV_BATCH_SIZE + ") AS index " + + "CREATE (n:Node {id: index, name: line[0], occupation: line[1]})"; + + private static final int STRESS_TEST_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2; + private static final long STRESS_TEST_DURATION_MS = SECONDS.toMillis( 5 ); + private static final String[] STRESS_TEST_QUERIES = {SHORT_QUERY_1, SHORT_QUERY_2, LONG_QUERY}; + + @Rule + public final TestNeo4j neo4j = new TestNeo4j(); + + private ExecutorService executor; + + @Before + public void setUp() + { + executor = Executors.newCachedThreadPool( daemon( getClass().getSimpleName() + "-thread" ) ); + } + + @After + public void tearDown() + { + if ( executor != null ) + { + executor.shutdownNow(); + } + } + + @Test + public void shouldTerminateAutoCommitQuery() throws Exception + { + testQueryTermination( LONG_QUERY, true ); + } + + @Test + public void shouldTerminateQueryInExplicitTransaction() throws Exception + { + testQueryTermination( LONG_QUERY, false ); + } + + /** + * It is currently unsafe to terminate periodic commit query because it'll then be half-committed. + */ + @Test + public void shouldNotTerminatePeriodicCommitQuery() throws Exception + { + Future queryResult = runQueryInDifferentThreadAndResetSession( longPeriodicCommitQuery(), true ); + + try + { + queryResult.get( 1, MINUTES ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), instanceOf( Neo4jException.class ) ); + } + awaitNoActiveQueries(); + + assertEquals( CSV_FILE_SIZE * LOAD_CSV_BATCH_SIZE, countNodes() ); + } + + @Test + public void shouldTerminateAutoCommitQueriesRandomly() throws Exception + { + testRandomQueryTermination( true ); + } + + @Test + public void shouldTerminateQueriesInExplicitTransactionsRandomly() throws Exception + { + testRandomQueryTermination( false ); + } + + @Test + public void shouldNotAllowBeginTxIfResetFailureIsNotConsumed() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + + try ( Session session = neo4j.driver().session() ) + { + Transaction tx1 = session.beginTransaction(); + + tx1.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + + awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); + session.reset(); + + try + { + session.beginTransaction(); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "You cannot begin a transaction on a session with an open transaction" ) ); + } + + try + { + tx1.run( "RETURN 1" ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "Cannot run more statements in this transaction, it has been terminated" ) ); + } + } + } + + @Test + public void shouldThrowExceptionOnCloseIfResetFailureIsNotConsumed() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + + Session session = neo4j.driver().session(); + session.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + + awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); + session.reset(); + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( Neo4jException e ) + { + assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); + } + } + + @Test + public void shouldBeAbleToBeginTxAfterResetFailureIsConsumed() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + + try ( Session session = neo4j.driver().session() ) + { + Transaction tx1 = session.beginTransaction(); + + StatementResult procedureResult = tx1.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", 10 ) ); + + awaitActiveQueriesToContain( "CALL test.driver.longRunningStatement" ); + session.reset(); + + try + { + procedureResult.consume(); + fail( "Should procedure throw an exception as we interrupted procedure call" ); + } + catch ( Neo4jException e ) + { + assertThat( e.getMessage(), containsString( "The transaction has been terminated" ) ); + } + finally + { + tx1.close(); + } + + try ( Transaction tx2 = session.beginTransaction() ) + { + tx2.run( "CREATE (n:FirstNode)" ); + tx2.success(); + } + + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + MatcherAssert.assertThat( nodes, equalTo( 1L ) ); + } + } + + @Test + public void shouldKillLongRunningStatement() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + // Given + int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + long startTime = -1, endTime; + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "CALL test.driver.longRunningStatement({seconds})", + parameters( "seconds", executionTimeout ) ); + + resetSessionAfterTimeout( session, killTimeout ); + + // When + startTime = System.currentTimeMillis(); + result.consume();// blocking to run the statement + + fail( "Should have got an exception about statement get killed." ); + } + catch ( Neo4jException e ) + { + endTime = System.currentTimeMillis(); + assertTrue( startTime > 0 ); + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset + assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + } + catch ( Exception e ) + { + fail( "Should be a Neo4jException" ); + } + } + + @Test + public void shouldKillLongStreamingResult() throws Throwable + { + neo4j.ensureProcedures( "longRunningStatement.jar" ); + // Given + int executionTimeout = 10; // 10s + final int killTimeout = 1; // 1s + long startTime = -1, endTime; + int recordCount = 0; + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "CALL test.driver.longStreamingResult({seconds})", + parameters( "seconds", executionTimeout ) ); + + resetSessionAfterTimeout( session, killTimeout ); + + // When + startTime = System.currentTimeMillis(); + while ( result.hasNext() ) + { + result.next(); + recordCount++; + } + + fail( "Should have got an exception about streaming get killed." ); + } + catch ( ClientException e ) + { + endTime = System.currentTimeMillis(); + assertThat( e.code(), equalTo( "Neo.ClientError.Procedure.ProcedureCallFailed" ) ); + assertThat( recordCount, greaterThan( 1 ) ); + + assertTrue( startTime > 0 ); + assertTrue( endTime - startTime > killTimeout * 1000 ); // get reset by session.reset + assertTrue( endTime - startTime < executionTimeout * 1000 / 2 ); // finished before execution finished + } + } + + private void resetSessionAfterTimeout( Session session, int timeout ) + { + executor.submit( () -> + { + try + { + Thread.sleep( timeout * 1000 ); // let the statement executing for timeout seconds + } + catch ( InterruptedException ignore ) + { + } + finally + { + session.reset(); // reset the session after timeout + } + } ); + } + + @Test + public void shouldAllowMoreStatementAfterSessionReset() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + + session.run( "RETURN 1" ).consume(); + + // When reset the state of this session + session.reset(); + + // Then can run successfully more statements without any error + session.run( "RETURN 2" ).consume(); + } + } + + @Test + public void shouldAllowMoreTxAfterSessionReset() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "RETURN 1" ); + tx.success(); + } + + // When reset the state of this session + session.reset(); + + // Then can run more Tx + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "RETURN 2" ); + tx.success(); + } + } + } + + @SuppressWarnings( "deprecation" ) + @Test + public void shouldMarkTxAsFailedAndDisallowRunAfterSessionReset() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + try ( Transaction tx = session.beginTransaction() ) + { + // When reset the state of this session + session.reset(); + // Then + tx.run( "RETURN 1" ); + fail( "Should not allow tx run as tx is already failed." ); + } + catch ( Exception e ) + { + assertThat( e.getMessage(), startsWith( "Cannot run more statements in this transaction" ) ); + } + } + } + + @SuppressWarnings( "deprecation" ) + @Test + public void shouldAllowMoreTxAfterSessionResetInTx() + { + // Given + try ( Session session = neo4j.driver().session() ) + { + try ( Transaction ignore = session.beginTransaction() ) + { + // When reset the state of this session + session.reset(); + } + + // Then can run more Tx + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( "RETURN 2" ); + tx.success(); + } + } + } + + @Test + public void resetShouldStopQueryWaitingForALock() throws Exception + { + // 3.1+ neo4j supports termination of queries that wait for a lock + assumeServerIs31OrLater(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = driver.session() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + StatementResult result = updateNodeId( session, nodeId, newNodeId ); + result.consume(); + } + } + } ); + } + + @Test + public void resetShouldStopTransactionWaitingForALock() throws Exception + { + // 3.1+ neo4j supports termination of queries that wait for a lock + assumeServerIs31OrLater(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + public void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + } + } + } ); + } + + @Test + public void resetShouldStopWriteTransactionWaitingForALock() throws Exception + { + // 3.1+ neo4j supports termination of queries that wait for a lock + assumeServerIs31OrLater(); + + AtomicInteger invocationsOfWork = new AtomicInteger(); + + testResetOfQueryWaitingForLock( new NodeIdUpdater() + { + @Override + public void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception + { + try ( Session session = driver.session() ) + { + usedSessionRef.set( session ); + latchToWait.await(); + + session.writeTransaction( tx -> + { + invocationsOfWork.incrementAndGet(); + StatementResult result = updateNodeId( tx, nodeId, newNodeId ); + result.consume(); + return null; + } ); + } + } + } ); + + assertEquals( 1, invocationsOfWork.get() ); + } + + @Test + public void shouldBeAbleToRunMoreStatementsAfterResetOnNoErrorState() + { + try ( Session session = neo4j.driver().session() ) + { + // Given + session.reset(); + + // When + Transaction tx = session.beginTransaction(); + tx.run( "CREATE (n:FirstNode)" ); + tx.success(); + tx.close(); + + // Then the outcome of both statements should be visible + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + assertThat( nodes, equalTo( 1L ) ); + } + } + + @Test + public void shouldHandleResetBeforeRun() + { + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + session.reset(); + + try + { + tx.run( "CREATE (n:FirstNode)" ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertThat( e.getMessage(), + containsString( "Cannot run more statements in this transaction, it has been terminated" ) ); + } + } + } + + @Test + public void shouldHandleResetFromMultipleThreads() throws Throwable + { + Session session = neo4j.driver().session(); + + CountDownLatch beforeCommit = new CountDownLatch( 1 ); + CountDownLatch afterReset = new CountDownLatch( 1 ); + + executor.submit( () -> + { + try ( Transaction tx1 = session.beginTransaction() ) + { + tx1.run( "CREATE (n:FirstNode)" ); + beforeCommit.countDown(); + afterReset.await(); + } + + try ( Transaction tx2 = session.beginTransaction() ) + { + tx2.run( "CREATE (n:FirstNode)" ); + tx2.success(); + } + + return null; + } ); + + executor.submit( () -> + { + beforeCommit.await(); + session.reset(); + afterReset.countDown(); + return null; + } ); + + executor.shutdown(); + executor.awaitTermination( 10, SECONDS ); + + // Then the outcome of both statements should be visible + StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); + long nodes = result.single().get( "count(n)" ).asLong(); + assertThat( nodes, equalTo( 1L ) ); + } + + private void testResetOfQueryWaitingForLock( NodeIdUpdater nodeIdUpdater ) throws Exception + { + int nodeId = 42; + int newNodeId1 = 4242; + int newNodeId2 = 424242; + + createNodeWithId( nodeId ); + + CountDownLatch nodeLocked = new CountDownLatch( 1 ); + AtomicReference otherSessionRef = new AtomicReference<>(); + + try ( Session session = neo4j.driver().session(); + Transaction tx = session.beginTransaction() ) + { + Future txResult = nodeIdUpdater.update( nodeId, newNodeId1, otherSessionRef, nodeLocked ); + + StatementResult result = updateNodeId( tx, nodeId, newNodeId2 ); + result.consume(); + tx.success(); + + nodeLocked.countDown(); + // give separate thread some time to block on a lock + Thread.sleep( 2_000 ); + otherSessionRef.get().reset(); + + assertTransactionTerminated( txResult ); + } + + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "MATCH (n) RETURN n.id AS id" ); + int value = result.single().get( "id" ).asInt(); + assertEquals( newNodeId2, value ); + } + } + + private void createNodeWithId( int id ) + { + try ( Session session = neo4j.driver().session() ) + { + session.run( "CREATE (n {id: {id}})", parameters( "id", id ) ); + } + } + + private static StatementResult updateNodeId( StatementRunner statementRunner, int currentId, int newId ) + { + return statementRunner.run( "MATCH (n {id: {currentId}}) SET n.id = {newId}", + parameters( "currentId", currentId, "newId", newId ) ); + } + + private static void assertTransactionTerminated( Future work ) throws Exception + { + try + { + work.get( 20, TimeUnit.SECONDS ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), CoreMatchers.instanceOf( TransientException.class ) ); + assertThat( e.getCause().getMessage(), startsWith( "The transaction has been terminated" ) ); + } + } + + private void testRandomQueryTermination( boolean autoCommit ) throws Exception + { + Set runningSessions = newSetFromMap( new ConcurrentHashMap<>() ); + AtomicBoolean stop = new AtomicBoolean(); + List> futures = new ArrayList<>(); + + for ( int i = 0; i < STRESS_TEST_THREAD_COUNT; i++ ) + { + futures.add( executor.submit( () -> + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + while ( !stop.get() ) + { + runRandomQuery( autoCommit, random, runningSessions, stop ); + } + } ) ); + } + + long deadline = System.currentTimeMillis() + STRESS_TEST_DURATION_MS; + while ( !stop.get() ) + { + if ( System.currentTimeMillis() > deadline ) + { + stop.set( true ); + } + + resetAny( runningSessions ); + + MILLISECONDS.sleep( 30 ); + } + + awaitAll( futures ); + awaitNoActiveQueries(); + } + + private void runRandomQuery( boolean autoCommit, Random random, Set runningSessions, AtomicBoolean stop ) + { + try + { + Session session = neo4j.driver().session(); + runningSessions.add( session ); + try + { + String query = STRESS_TEST_QUERIES[random.nextInt( STRESS_TEST_QUERIES.length - 1 )]; + runQuery( session, query, autoCommit ); + } + finally + { + runningSessions.remove( session ); + session.close(); + } + } + catch ( Throwable error ) + { + if ( !stop.get() && !isAcceptable( error ) ) + { + stop.set( true ); + throw error; + } + // else it is fine to receive some errors from the driver because + // sessions are being reset concurrently by the main thread, driver can also be closed concurrently + } + } + + private void testQueryTermination( String query, boolean autoCommit ) throws Exception + { + Future queryResult = runQueryInDifferentThreadAndResetSession( query, autoCommit ); + + try + { + queryResult.get( 10, SECONDS ); + fail( "Exception expected" ); + } + catch ( ExecutionException e ) + { + assertThat( e.getCause(), instanceOf( Neo4jException.class ) ); + } + + awaitNoActiveQueries(); + } + + private Future runQueryInDifferentThreadAndResetSession( String query, boolean autoCommit ) + { + AtomicReference sessionRef = new AtomicReference<>(); + + Future queryResult = runAsync( () -> + { + Session session = neo4j.driver().session(); + sessionRef.set( session ); + runQuery( session, query, autoCommit ); + } ); + + awaitActiveQueriesToContain( query ); + + Session session = sessionRef.get(); + assertNotNull( session ); + session.reset(); + + return queryResult; + } + + private static void runQuery( Session session, String query, boolean autoCommit ) + { + if ( autoCommit ) + { + session.run( query ).consume(); + } + else + { + try ( Transaction tx = session.beginTransaction() ) + { + tx.run( query ); + tx.success(); + } + } + } + + private void awaitNoActiveQueries() + { + awaitCondition( () -> activeQueryCount( neo4j.driver() ) == 0 ); + } + + private void awaitActiveQueriesToContain( String value ) + { + awaitCondition( () -> + activeQueryNames( neo4j.driver() ).stream().anyMatch( query -> query.contains( value ) ) ); + } + + private long countNodes() + { + try ( Session session = neo4j.driver().session() ) + { + StatementResult result = session.run( "MATCH (n) RETURN count(n) AS result" ); + return result.single().get( 0 ).asLong(); + } + } + + private static void resetAny( Set sessions ) + { + sessions.stream().findAny().ifPresent( session -> + { + if ( sessions.remove( session ) ) + { + resetSafely( session ); + } + } ); + } + + private static void resetSafely( Session session ) + { + try + { + if ( session.isOpen() ) + { + session.reset(); + } + } + catch ( ClientException e ) + { + if ( session.isOpen() ) + { + throw e; + } + // else this thread lost race with close and it's fine + } + } + + private static boolean isAcceptable( Throwable error ) + { + // get the root cause + while ( error.getCause() != null ) + { + error = error.getCause(); + } + + return isTransactionTerminatedException( error ) || + error instanceof ServiceUnavailableException || + error instanceof ClientException || + error instanceof ClosedChannelException; + } + + private static boolean isTransactionTerminatedException( Throwable error ) + { + return error instanceof TransientException && + error.getMessage().startsWith( "The transaction has been terminated" ); + } + + private static String longPeriodicCommitQuery() + { + URI fileUri = createTmpCsvFile(); + return String.format( LONG_PERIODIC_COMMIT_QUERY_TEMPLATE, fileUri ); + } + + private static URI createTmpCsvFile() + { + try + { + Path importDir = Paths.get( HOME_DIR, TEST_SETTINGS.propertiesMap().get( IMPORT_DIR ) ); + Path csvFile = Files.createTempFile( importDir, "test", ".csv" ); + Iterable lines = range( 0, CSV_FILE_SIZE ).mapToObj( i -> "Foo-" + i + ", Bar-" + i )::iterator; + return URI.create( "file:///" + Files.write( csvFile, lines ).getFileName() ); + } + catch ( IOException e ) + { + throw new UncheckedIOException( e ); + } + } + + private static void awaitAll( List> futures ) throws Exception + { + for ( Future future : futures ) + { + assertNull( future.get( 1, MINUTES ) ); + } + } + + private void assumeServerIs31OrLater() + { + ServerVersion serverVersion = ServerVersion.version( neo4j.driver() ); + assumeTrue( "Ignored on `" + serverVersion + "`", serverVersion.greaterThanOrEqual( v3_1_0 ) ); + } + + private abstract class NodeIdUpdater + { + final Future update( int nodeId, int newNodeId, AtomicReference usedSessionRef, + CountDownLatch latchToWait ) + { + return executor.submit( () -> + { + performUpdate( neo4j.driver(), nodeId, newNodeId, usedSessionRef, latchToWait ); + return null; + } ); + } + + abstract void performUpdate( Driver driver, int nodeId, int newNodeId, + AtomicReference usedSessionRef, CountDownLatch latchToWait ) throws Exception; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index 617dc5cb24..c9ebc981cd 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -809,7 +809,6 @@ public void shouldFailToCommitAfterTermination() { assertEquals( "Can't commit, transaction has been terminated", e.getMessage() ); } - assertFalse( tx.isOpen() ); } @Test @@ -1283,6 +1282,32 @@ public void shouldAllowUsingBlockingApiInCommonPoolWhenChaining() assertEquals( 1, countNodes( 42 ) ); } + @Test + public void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated() + { + Transaction tx1 = await( session.beginTransactionAsync() ); + ((ExplicitTransaction) tx1).markTerminated(); + + try + { + // commit should fail, make session forget about this transaction and release the connection to the pool + await( tx1.commitAsync() ); + fail( "Exception expected" ); + } + catch ( ClientException e ) + { + assertEquals( "Can't commit, transaction has been terminated", e.getMessage() ); + } + + await( session.beginTransactionAsync() + .thenCompose( tx -> tx.runAsync( "CREATE (:Node {id: 42})" ) + .thenCompose( StatementResultCursor::consumeAsync ) + .thenApply( ignore -> tx ) + ).thenCompose( Transaction::commitAsync ) ); + + assertEquals( 1, countNodes( 42 ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index f3653fab24..7cf949e3dd 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -24,9 +24,6 @@ import org.junit.rules.ExpectedException; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; @@ -61,8 +58,6 @@ public class TransactionIT @Rule public TestNeo4jSession session = new TestNeo4jSession(); - private Transaction globalTx; - @Test public void shouldRunAndCommit() throws Throwable { @@ -232,95 +227,6 @@ public void shouldHandleNullMapParameters() throws Throwable // Then it wasn't the end of the world as we know it } - @SuppressWarnings( "deprecation" ) - @Test - public void shouldBeAbleToRunMoreStatementsAfterResetOnNoErrorState() throws Throwable - { - // Given - session.reset(); - - // When - Transaction tx = session.beginTransaction(); - tx.run( "CREATE (n:FirstNode)" ); - tx.success(); - tx.close(); - - // Then the outcome of both statements should be visible - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - assertThat( nodes, equalTo( 1L ) ); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldHandleResetBeforeRun() throws Throwable - { - // Expect - exception.expect( ClientException.class ); - exception.expectMessage( "Cannot run more statements in this transaction, it has been terminated" ); - // When - Transaction tx = session.beginTransaction(); - session.reset(); - tx.run( "CREATE (n:FirstNode)" ); - } - - @SuppressWarnings( "deprecation" ) - @Test - public void shouldHandleResetFromMultipleThreads() throws Throwable - { - // When - ExecutorService runner = Executors.newFixedThreadPool( 2 ); - runner.execute( new Runnable() - - { - @Override - public void run() - { - globalTx = session.beginTransaction(); - globalTx.run( "CREATE (n:FirstNode)" ); - try - { - Thread.sleep( 1000 ); - } - catch ( InterruptedException e ) - { - throw new AssertionError( e ); - } - - globalTx = session.beginTransaction(); - globalTx.run( "CREATE (n:FirstNode)" ); - globalTx.success(); - globalTx.close(); - - } - } ); - runner.execute( new Runnable() - - { - @Override - public void run() - { - try - { - Thread.sleep( 500 ); - } - catch ( InterruptedException e ) - { - throw new AssertionError( e ); - } - - session.reset(); - } - } ); - - runner.awaitTermination( 5, TimeUnit.SECONDS ); - - // Then the outcome of both statements should be visible - StatementResult result = session.run( "MATCH (n) RETURN count(n)" ); - long nodes = result.single().get( "count(n)" ).asLong(); - assertThat( nodes, equalTo( 1L ) ); - } - @Test public void shouldRollBackTxIfErrorWithoutConsume() throws Throwable { diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index 8634b421b8..db11cd18c7 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -30,7 +30,9 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ResponseHandler; @@ -39,9 +41,12 @@ import org.neo4j.driver.v1.StatementResult; import static java.util.Collections.emptyMap; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.stream.Collectors.toList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -49,6 +54,8 @@ public final class TestUtil { + private static final long DEFAULT_WAIT_TIME_MS = MINUTES.toMillis( 1 ); + private TestUtil() { } @@ -84,7 +91,7 @@ public static > T await( U future ) { try { - return future.get( 5, MINUTES ); + return future.get( DEFAULT_WAIT_TIME_MS, MILLISECONDS ); } catch ( InterruptedException e ) { @@ -200,6 +207,50 @@ public static void interruptWhenInWaitingState( Thread thread ) } ); } + public static int activeQueryCount( Driver driver ) + { + return activeQueryNames( driver ).size(); + } + + public static List activeQueryNames( Driver driver ) + { + try ( Session session = driver.session() ) + { + return session.run( "CALL dbms.listQueries() YIELD query RETURN query" ) + .list() + .stream() + .map( record -> record.get( 0 ).asString() ) + .filter( query -> !query.contains( "dbms.listQueries" ) ) // do not include listQueries procedure + .collect( toList() ); + } + } + + public static void awaitCondition( BooleanSupplier condition ) + { + awaitCondition( condition, DEFAULT_WAIT_TIME_MS, MILLISECONDS ); + } + + public static void awaitCondition( BooleanSupplier condition, long value, TimeUnit unit ) + { + long deadline = System.currentTimeMillis() + unit.toMillis( value ); + while ( !condition.getAsBoolean() ) + { + if ( System.currentTimeMillis() > deadline ) + { + fail( "Condition was not met in time" ); + } + try + { + MILLISECONDS.sleep( 100 ); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + fail( "Interrupted while waiting" ); + } + } + } + private static void setupSuccessfulPullAll( Connection connection, String statement ) { doAnswer( invocation -> From f3ae1d4815377f1acd0e7518a58cfea991787f96 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 11 Jan 2018 18:16:44 +0100 Subject: [PATCH 5/5] Tests for commit and rollback error propagation Commit adds couple stub-server tests to assert that commit and rollback errors are correctly propagated. --- .../driver/v1/integration/TransactionIT.java | 57 +++++++++++++++++++ driver/src/test/resources/commit_error.script | 18 ++++++ .../src/test/resources/rollback_error.script | 18 ++++++ 3 files changed, 93 insertions(+) create mode 100644 driver/src/test/resources/commit_error.script create mode 100644 driver/src/test/resources/rollback_error.script diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java index 7cf949e3dd..be4741a759 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionIT.java @@ -28,8 +28,10 @@ import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.util.ChannelTrackingDriverFactory; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; @@ -37,6 +39,8 @@ import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.TransientException; +import org.neo4j.driver.v1.util.StubServer; import org.neo4j.driver.v1.util.TestNeo4jSession; import org.neo4j.driver.v1.util.TestUtil; @@ -385,6 +389,18 @@ public void shouldThrowWhenConnectionKilledDuringTransactionMarkedForSuccess() testFailWhenConnectionKilledDuringTransaction( true ); } + @Test + public void shouldThrowCommitError() throws Exception + { + testTxCloseErrorPropagation( "commit_error.script", true, "Unable to commit" ); + } + + @Test + public void shouldThrowRollbackError() throws Exception + { + testTxCloseErrorPropagation( "rollback_error.script", false, "Unable to rollback" ); + } + private void testFailWhenConnectionKilledDuringTransaction( boolean markForSuccess ) { ChannelTrackingDriverFactory factory = new ChannelTrackingDriverFactory( 1, Clock.SYSTEM ); @@ -420,4 +436,45 @@ private void testFailWhenConnectionKilledDuringTransaction( boolean markForSucce assertEquals( 0, session.run( "MATCH (n:MyNode {id: 1}) RETURN count(n)" ).single().get( 0 ).asInt() ); } + + private static void testTxCloseErrorPropagation( String script, boolean commit, String expectedErrorMessage ) + throws Exception + { + StubServer server = StubServer.start( script, 9001 ); + try + { + Config config = Config.build().withLogging( DEV_NULL_LOGGING ).withoutEncryption().toConfig(); + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", AuthTokens.none(), config ); + Session session = driver.session() ) + { + Transaction tx = session.beginTransaction(); + StatementResult result = tx.run( "CREATE (n {name:'Alice'}) RETURN n.name AS name" ); + assertEquals( "Alice", result.single().get( "name" ).asString() ); + + if ( commit ) + { + tx.success(); + } + else + { + tx.failure(); + } + + try + { + tx.close(); + fail( "Exception expected" ); + } + catch ( TransientException e ) + { + assertEquals( "Neo.TransientError.General.DatabaseUnavailable", e.code() ); + assertEquals( expectedErrorMessage, e.getMessage() ); + } + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } } diff --git a/driver/src/test/resources/commit_error.script b/driver/src/test/resources/commit_error.script new file mode 100644 index 0000000000..1c4ac21674 --- /dev/null +++ b/driver/src/test/resources/commit_error.script @@ -0,0 +1,18 @@ +!: AUTO INIT +!: AUTO RESET + +C: RUN "BEGIN" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Alice'}) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + SUCCESS {} +C: RUN "COMMIT" {} + PULL_ALL +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to commit"} + IGNORED +C: ACK_FAILURE +S: SUCCESS {} diff --git a/driver/src/test/resources/rollback_error.script b/driver/src/test/resources/rollback_error.script new file mode 100644 index 0000000000..637be70b2a --- /dev/null +++ b/driver/src/test/resources/rollback_error.script @@ -0,0 +1,18 @@ +!: AUTO INIT +!: AUTO RESET + +C: RUN "BEGIN" {} + PULL_ALL +S: SUCCESS {} + SUCCESS {} +C: RUN "CREATE (n {name:'Alice'}) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + SUCCESS {} +C: RUN "ROLLBACK" {} + PULL_ALL +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to rollback"} + IGNORED +C: ACK_FAILURE +S: SUCCESS {}