diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 00740f0177..f2397c3460 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; @@ -361,28 +362,12 @@ private BiFunction handleCommitOrRollback( Throwable cursor { return ( ignore, commitOrRollbackError ) -> { - if ( cursorFailure != null && commitOrRollbackError != null ) + CompletionException combinedError = Futures.combineErrors( cursorFailure, commitOrRollbackError ); + if ( combinedError != null ) { - Throwable cause1 = Futures.completionExceptionCause( cursorFailure ); - Throwable cause2 = Futures.completionExceptionCause( commitOrRollbackError ); - if ( cause1 != cause2 ) - { - cause1.addSuppressed( cause2 ); - } - throw Futures.asCompletionException( cause1 ); - } - else if ( cursorFailure != null ) - { - throw Futures.asCompletionException( cursorFailure ); - } - else if ( commitOrRollbackError != null ) - { - throw Futures.asCompletionException( commitOrRollbackError ); - } - else - { - return null; + throw combinedError; } + return null; }; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 1a90f3d5d7..a911e8dfe2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -167,24 +167,23 @@ public CompletionStage closeAsync() { return resultCursorStage.thenCompose( cursor -> { - if ( cursor == null ) + if ( cursor != null ) { - return completedWithNull(); + // there exists a cursor with potentially unconsumed error, try to extract and propagate it + return cursor.failureAsync(); } - return cursor.failureAsync(); - } ).thenCompose( error -> releaseResources().thenApply( ignore -> + // no result cursor exists so no error exists + return completedWithNull(); + } ).thenCompose( cursorError -> closeTransactionAndReleaseConnection().thenApply( txCloseError -> { - if ( error != null ) - { - // connection has been acquired and there is an unconsumed error in result cursor - throw Futures.asCompletionException( error ); - } - else + // now we have cursor error, active transaction has been closed and connection has been released + // back to the pool; try to propagate cursor and transaction close errors, if any + CompletionException combinedError = Futures.combineErrors( cursorError, txCloseError ); + if ( combinedError != null ) { - // either connection acquisition failed or - // there are no unconsumed errors in the result cursor - return null; + throw combinedError; } + return null; } ) ); } return completedWithNull(); @@ -520,26 +519,22 @@ private CompletionStage acquireConnection( AccessMode mode ) return newConnectionStage; } - private CompletionStage releaseResources() - { - return rollbackTransaction().thenCompose( ignore -> releaseConnection() ); - } - - private CompletionStage rollbackTransaction() + private CompletionStage closeTransactionAndReleaseConnection() { return existingTransactionOrNull().thenCompose( tx -> { if ( tx != null ) { - return tx.rollbackAsync(); + // there exists an open transaction, let's close it and propagate the error, if any + return tx.closeAsync() + .thenApply( ignore -> (Throwable) null ) + .exceptionally( error -> error ); } + // no open transaction so nothing to close return completedWithNull(); - } ).exceptionally( error -> - { - Throwable cause = Futures.completionExceptionCause( error ); - logger.warn( "Active transaction rolled back with an error", cause ); - return null; - } ); + } ).thenCompose( txCloseError -> + // then release the connection and propagate transaction close error, if any + releaseConnection().thenApply( ignore -> txCloseError ) ); } private CompletionStage releaseConnection() @@ -548,8 +543,10 @@ private CompletionStage releaseConnection() { if ( connection != null ) { + // there exists connection, try to release it back to the pool return connection.release(); } + // no connection so return null return completedWithNull(); } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java index 98eba5452c..e65be4f459 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/Futures.java @@ -172,6 +172,40 @@ public static CompletionException asCompletionException( Throwable error ) return new CompletionException( error ); } + /** + * Combine given errors into a single {@link CompletionException} to be rethrown from inside a + * {@link CompletionStage} chain. + * + * @param error1 the first error or {@code null}. + * @param error2 the second error or {@code null}. + * @return {@code null} if both errors are null, {@link CompletionException} otherwise. + */ + public static CompletionException combineErrors( Throwable error1, Throwable error2 ) + { + if ( error1 != null && error2 != null ) + { + Throwable cause1 = completionExceptionCause( error1 ); + Throwable cause2 = completionExceptionCause( error2 ); + if ( cause1 != cause2 ) + { + cause1.addSuppressed( cause2 ); + } + return asCompletionException( cause1 ); + } + else if ( error1 != null ) + { + return asCompletionException( error1 ); + } + else if ( error2 != null ) + { + return asCompletionException( error2 ); + } + else + { + return null; + } + } + private static void safeRun( Runnable runnable ) { try diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java b/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java index 0cad2f8efc..4c99824e88 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/FuturesTest.java @@ -37,6 +37,7 @@ import org.neo4j.driver.internal.async.EventLoopGroupFactory; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -360,4 +361,43 @@ public void shouldKeepCompletionExceptionAsIs() CompletionException error = new CompletionException( new RuntimeException( "Hello" ) ); assertEquals( error, Futures.asCompletionException( error ) ); } + + @Test + public void shouldCombineTwoErrors() + { + RuntimeException error1 = new RuntimeException( "Error1" ); + RuntimeException error2Cause = new RuntimeException( "Error2" ); + CompletionException error2 = new CompletionException( error2Cause ); + + CompletionException combined = Futures.combineErrors( error1, error2 ); + + assertEquals( error1, combined.getCause() ); + assertArrayEquals( new Throwable[]{error2Cause}, combined.getCause().getSuppressed() ); + } + + @Test + public void shouldCombineErrorAndNull() + { + RuntimeException error1 = new RuntimeException( "Error1" ); + + CompletionException combined = Futures.combineErrors( error1, null ); + + assertEquals( error1, combined.getCause() ); + } + + @Test + public void shouldCombineNullAndError() + { + RuntimeException error2 = new RuntimeException( "Error2" ); + + CompletionException combined = Futures.combineErrors( null, error2 ); + + assertEquals( error2, combined.getCause() ); + } + + @Test + public void shouldCombineNullAndNullErrors() + { + assertNull( Futures.combineErrors( null, null ) ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 4b45abc90e..8cd50023a1 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -37,16 +37,19 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.logging.ConsoleLogging; import org.neo4j.driver.internal.retry.RetrySettings; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; import org.neo4j.driver.internal.util.DriverFactoryWithOneEventLoopThread; import org.neo4j.driver.internal.util.ServerVersion; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.AuthToken; +import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; @@ -62,6 +65,7 @@ import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.summary.StatementType; +import org.neo4j.driver.v1.util.StubServer; import org.neo4j.driver.v1.util.TestNeo4j; import org.neo4j.driver.v1.util.TestUtil; @@ -1296,6 +1300,50 @@ public void shouldNotAllowStartingMultipleTransactions() } } + @Test + public void shouldCloseOpenTransactionWhenClosed() + { + try ( Session session = neo4j.driver().session() ) + { + Transaction tx = session.beginTransaction(); + tx.run( "CREATE (:Node {id: 123})" ); + tx.run( "CREATE (:Node {id: 456})" ); + + tx.success(); + } + + assertEquals( 1, countNodesWithId( 123 ) ); + assertEquals( 1, countNodesWithId( 456 ) ); + } + + @Test + public void shouldRollbackOpenTransactionWhenClosed() + { + try ( Session session = neo4j.driver().session() ) + { + Transaction tx = session.beginTransaction(); + tx.run( "CREATE (:Node {id: 123})" ); + tx.run( "CREATE (:Node {id: 456})" ); + + tx.failure(); + } + + assertEquals( 0, countNodesWithId( 123 ) ); + assertEquals( 0, countNodesWithId( 456 ) ); + } + + @Test + public void shouldPropagateTransactionCommitErrorWhenClosed() throws Exception + { + testTransactionCloseErrorPropagationWhenSessionClosed( "commit_error.script", true, "Unable to commit" ); + } + + @Test + public void shouldPropagateTransactionRollbackErrorWhenClosed() throws Exception + { + testTransactionCloseErrorPropagationWhenSessionClosed( "rollback_error.script", false, "Unable to rollback" ); + } + private void testExecuteReadTx( AccessMode sessionMode ) { Driver driver = neo4j.driver(); @@ -1501,6 +1549,52 @@ private static void await( CountDownLatch latch ) } } + private static void testTransactionCloseErrorPropagationWhenSessionClosed( String script, boolean commit, + String expectedErrorMessage ) throws Exception + { + StubServer server = StubServer.start( script, 9001 ); + try + { + Config config = Config.build() + .withLogging( DEV_NULL_LOGGING ) + .withLogging( new ConsoleLogging( Level.INFO ) ) + .withoutEncryption() + .toConfig(); + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:9001", AuthTokens.none(), config ) ) + { + Session session = driver.session(); + + Transaction tx = session.beginTransaction(); + StatementResult result = tx.run( "CREATE (n {name:'Alice'}) RETURN n.name AS name" ); + assertEquals( "Alice", result.single().get( "name" ).asString() ); + + if ( commit ) + { + tx.success(); + } + else + { + tx.failure(); + } + + try + { + session.close(); + fail( "Exception expected" ); + } + catch ( TransientException e ) + { + assertEquals( "Neo.TransientError.General.DatabaseUnavailable", e.code() ); + assertEquals( expectedErrorMessage, e.getMessage() ); + } + } + } + finally + { + assertEquals( 0, server.exitStatus() ); + } + } + private static class ThrowingWork implements TransactionWork { final String query;