From 65b0a34978306c48bd4daaa68ad859e21215a2f8 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Wed, 10 Nov 2021 11:28:46 +0000 Subject: [PATCH 1/2] Call close with the appropriate flag to commit or rollback on UnmanagedTransaction where possible to avoid double state acquisition Calling `close` instead of separate `isOpen` and `commitAsync` requires less lock acquisitions and is safer. --- .../internal/async/InternalAsyncSession.java | 54 ++++++++----------- .../internal/async/UnmanagedTransaction.java | 7 ++- .../internal/reactive/InternalRxSession.java | 2 +- .../reactive/InternalRxTransaction.java | 9 ++-- .../reactive/InternalRxTransactionTest.java | 4 +- 5 files changed, 35 insertions(+), 41 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java index 83f0f7a862..efc291933b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncSession.java @@ -146,7 +146,7 @@ private void executeWork(CompletableFuture resultFuture, UnmanagedTransac Throwable error = Futures.completionExceptionCause( completionError ); if ( error != null ) { - rollbackTxAfterFailedTransactionWork( tx, resultFuture, error ); + closeTxAfterFailedTransactionWork( tx, resultFuture, error ); } else { @@ -174,43 +174,33 @@ private CompletionStage safeExecuteWork(UnmanagedTransaction tx, AsyncTra } } - private void rollbackTxAfterFailedTransactionWork(UnmanagedTransaction tx, CompletableFuture resultFuture, Throwable error ) + private void closeTxAfterFailedTransactionWork( UnmanagedTransaction tx, CompletableFuture resultFuture, Throwable error ) { - if ( tx.isOpen() ) - { - tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) -> { - if ( rollbackError != null ) + tx.closeAsync().whenComplete( + ( ignored, rollbackError ) -> { - error.addSuppressed( rollbackError ); - } - resultFuture.completeExceptionally( error ); - } ); - } - else - { - resultFuture.completeExceptionally( error ); - } + if ( rollbackError != null ) + { + error.addSuppressed( rollbackError ); + } + resultFuture.completeExceptionally( error ); + } ); } private void closeTxAfterSucceededTransactionWork(UnmanagedTransaction tx, CompletableFuture resultFuture, T result ) { - if ( tx.isOpen() ) - { - tx.commitAsync().whenComplete( ( ignore, completionError ) -> { - Throwable commitError = Futures.completionExceptionCause( completionError ); - if ( commitError != null ) + tx.closeAsync( true ).whenComplete( + ( ignored, completionError ) -> { - resultFuture.completeExceptionally( commitError ); - } - else - { - resultFuture.complete( result ); - } - } ); - } - else - { - resultFuture.complete( result ); - } + Throwable commitError = Futures.completionExceptionCause( completionError ); + if ( commitError != null ) + { + resultFuture.completeExceptionally( commitError ); + } + else + { + resultFuture.complete( result ); + } + } ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 03fdb0e7ca..fbd7a985c7 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -134,7 +134,12 @@ else if ( beginError instanceof ConnectionReadTimeoutException ) public CompletionStage closeAsync() { - return closeAsync( false, true ); + return closeAsync( false ); + } + + public CompletionStage closeAsync( boolean commit ) + { + return closeAsync( commit, true ); } public CompletionStage commitAsync() diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index 41f70d0a1d..222b64562d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -130,7 +130,7 @@ public Publisher writeTransaction( RxTransactionWork Publisher runTransaction( AccessMode mode, RxTransactionWork> work, TransactionConfig config ) { Flux repeatableWork = Flux.usingWhen( beginTransaction( mode, config ), work::execute, - InternalRxTransaction::commitIfOpen, ( tx, error ) -> tx.close(), InternalRxTransaction::close ); + tx -> tx.close( true ), ( tx, error ) -> tx.close(), InternalRxTransaction::close ); return session.retryLogic().retryRx( repeatableWork ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index c1a9267336..b4212ae963 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -30,7 +30,6 @@ import org.neo4j.driver.reactive.RxTransaction; import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; -import static org.neo4j.driver.internal.util.Futures.completedWithNull; public class InternalRxTransaction extends AbstractRxQueryRunner implements RxTransaction { @@ -77,13 +76,13 @@ public Publisher rollback() return createEmptyPublisher( tx::rollbackAsync ); } - Publisher commitIfOpen() + Publisher close() { - return createEmptyPublisher( () -> tx.isOpen() ? tx.commitAsync() : completedWithNull() ); + return close( false ); } - Publisher close() + Publisher close( boolean commit ) { - return createEmptyPublisher( tx::closeAsync ); + return createEmptyPublisher( () -> tx.closeAsync( commit ) ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index 1accde96db..b2ae93245a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -147,7 +147,7 @@ void shouldCommitWhenOpen() when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); InternalRxTransaction rxTx = new InternalRxTransaction( tx ); - Publisher publisher = rxTx.commitIfOpen(); + Publisher publisher = rxTx.close( true ); StepVerifier.create( publisher ).verifyComplete(); verify( tx ).commitAsync(); @@ -161,7 +161,7 @@ void shouldNotCommitWhenNotOpen() when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); InternalRxTransaction rxTx = new InternalRxTransaction( tx ); - Publisher publisher = rxTx.commitIfOpen(); + Publisher publisher = rxTx.close( true ); StepVerifier.create( publisher ).verifyComplete(); verify( tx, never() ).commitAsync(); From 4071d5fbdb3878167b6f43641a6fc2dde3ad2cae Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Wed, 10 Nov 2021 12:12:10 +0000 Subject: [PATCH 2/2] Update tests --- .../async/UnmanagedTransactionTest.java | 16 ++++-- .../reactive/InternalRxSessionTest.java | 56 +++++++++---------- .../reactive/InternalRxTransactionTest.java | 26 ++------- 3 files changed, 43 insertions(+), 55 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index 6a29c338bf..f639565a66 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -413,15 +413,21 @@ void shouldReturnFailingStageOnConflictingCompletingAction( boolean protocolComm private static Stream closingNotActionTransactionArgs() { return Stream.of( - Arguments.of( true, 1, "commit" ), - Arguments.of( false, 1, "rollback" ), - Arguments.of( false, 0, "terminate" ) + Arguments.of( true, 1, "commit", null ), + Arguments.of( false, 1, "rollback", null ), + Arguments.of( false, 0, "terminate", null ), + Arguments.of( true, 1, "commit", true ), + Arguments.of( false, 1, "rollback", true ), + Arguments.of( true, 1, "commit", false ), + Arguments.of( false, 1, "rollback", false ), + Arguments.of( false, 0, "terminate", false ) ); } @ParameterizedTest @MethodSource( "closingNotActionTransactionArgs" ) - void shouldReturnCompletedWithNullStageOnClosingNotActiveTransaction( boolean protocolCommit, int expectedProtocolInvocations, String originalAction ) + void shouldReturnCompletedWithNullStageOnClosingInactiveTransactionExceptCommittingAborted( + boolean protocolCommit, int expectedProtocolInvocations, String originalAction, Boolean commitOnClose ) { Connection connection = mock( Connection.class ); BoltProtocol protocol = mock( BoltProtocol.class ); @@ -431,7 +437,7 @@ void shouldReturnCompletedWithNullStageOnClosingNotActiveTransaction( boolean pr UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); CompletionStage originalActionStage = mapTransactionAction( originalAction, tx ).get(); - CompletionStage closeStage = tx.closeAsync(); + CompletionStage closeStage = commitOnClose != null ? tx.closeAsync( commitOnClose ) : tx.closeAsync(); assertTrue( originalActionStage.toCompletableFuture().isDone() ); assertFalse( originalActionStage.toCompletableFuture().isCompletedExceptionally() ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java index 3f320231ab..2ca1ea21fe 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxSessionTest.java @@ -199,9 +199,7 @@ void shouldDelegateRunTx( Function> runTx ) throws T // Given NetworkSession session = mock( NetworkSession.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - when( tx.isOpen() ).thenReturn( true ); - when( tx.commitAsync() ).thenReturn( completedWithNull() ); - when( tx.rollbackAsync() ).thenReturn( completedWithNull() ); + when( tx.closeAsync( true ) ).thenReturn( completedWithNull() ); when( session.beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ) ).thenReturn( completedFuture( tx ) ); when( session.retryLogic() ).thenReturn( new FixedRetryLogic( 1 ) ); @@ -213,7 +211,7 @@ void shouldDelegateRunTx( Function> runTx ) throws T // Then verify( session ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ); - verify( tx ).commitAsync(); + verify( tx ).closeAsync( true ); } @Test @@ -223,25 +221,24 @@ void shouldRetryOnError() throws Throwable int retryCount = 2; NetworkSession session = mock( NetworkSession.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - when( tx.isOpen() ).thenReturn( true ); - when( tx.commitAsync() ).thenReturn( completedWithNull() ); - when( tx.rollbackAsync() ).thenReturn( completedWithNull() ); + when( tx.closeAsync( false ) ).thenReturn( completedWithNull() ); when( session.beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ) ).thenReturn( completedFuture( tx ) ); when( session.retryLogic() ).thenReturn( new FixedRetryLogic( retryCount ) ); InternalRxSession rxSession = new InternalRxSession( session ); // When - Publisher strings = rxSession.readTransaction( t -> - Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) ) ); + Publisher strings = rxSession.readTransaction( + t -> + Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) ) ); StepVerifier.create( Flux.from( strings ) ) - // we lost the "a"s too as the user only see the last failure - .expectError( RuntimeException.class ) - .verify(); + // we lost the "a"s too as the user only see the last failure + .expectError( RuntimeException.class ) + .verify(); // Then verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ); - verify( tx, times( retryCount + 1 ) ).closeAsync(); + verify( tx, times( retryCount + 1 ) ).closeAsync( false ); } @Test @@ -251,9 +248,8 @@ void shouldObtainResultIfRetrySucceed() throws Throwable int retryCount = 2; NetworkSession session = mock( NetworkSession.class ); UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - when( tx.isOpen() ).thenReturn( true ); - when( tx.commitAsync() ).thenReturn( completedWithNull() ); - when( tx.rollbackAsync() ).thenReturn( completedWithNull() ); + when( tx.closeAsync( false ) ).thenReturn( completedWithNull() ); + when( tx.closeAsync( true ) ).thenReturn( completedWithNull() ); when( session.beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ) ).thenReturn( completedFuture( tx ) ); when( session.retryLogic() ).thenReturn( new FixedRetryLogic( retryCount ) ); @@ -261,23 +257,25 @@ void shouldObtainResultIfRetrySucceed() throws Throwable // When AtomicInteger count = new AtomicInteger(); - Publisher strings = rxSession.readTransaction( t -> { - // we fail for the first few retries, and then success on the last run. - if ( count.getAndIncrement() == retryCount ) - { - return Flux.just( "a" ); - } - else - { - return Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) ); - } - } ); + Publisher strings = rxSession.readTransaction( + t -> + { + // we fail for the first few retries, and then success on the last run. + if ( count.getAndIncrement() == retryCount ) + { + return Flux.just( "a" ); + } + else + { + return Flux.just( "a" ).then( Mono.error( new RuntimeException( "Errored" ) ) ); + } + } ); StepVerifier.create( Flux.from( strings ) ).expectNext( "a" ).verifyComplete(); // Then verify( session, times( retryCount + 1 ) ).beginTransactionAsync( any( AccessMode.class ), any( TransactionConfig.class ) ); - verify( tx, times( retryCount ) ).closeAsync(); - verify( tx ).commitAsync(); + verify( tx, times( retryCount ) ).closeAsync( false ); + verify( tx ).closeAsync( true ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index b2ae93245a..5a9f0bb4b6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -48,7 +48,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.Values.parameters; @@ -140,43 +139,28 @@ void shouldMarkTxIfFailedToRun( Function runReturnOne ) } @Test - void shouldCommitWhenOpen() + void shouldDelegateConditionalClose() { UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - when( tx.isOpen() ).thenReturn( true ); - when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); - - InternalRxTransaction rxTx = new InternalRxTransaction( tx ); - Publisher publisher = rxTx.close( true ); - StepVerifier.create( publisher ).verifyComplete(); - - verify( tx ).commitAsync(); - } - - @Test - void shouldNotCommitWhenNotOpen() - { - UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - when( tx.isOpen() ).thenReturn( false ); - when( tx.commitAsync() ).thenReturn( Futures.completedWithNull() ); + when( tx.closeAsync( true ) ).thenReturn( Futures.completedWithNull() ); InternalRxTransaction rxTx = new InternalRxTransaction( tx ); Publisher publisher = rxTx.close( true ); StepVerifier.create( publisher ).verifyComplete(); - verify( tx, never() ).commitAsync(); + verify( tx ).closeAsync( true ); } @Test void shouldDelegateClose() { UnmanagedTransaction tx = mock( UnmanagedTransaction.class ); - when( tx.closeAsync() ).thenReturn( Futures.completedWithNull() ); + when( tx.closeAsync( false ) ).thenReturn( Futures.completedWithNull() ); InternalRxTransaction rxTx = new InternalRxTransaction( tx ); Publisher publisher = rxTx.close(); StepVerifier.create( publisher ).verifyComplete(); - verify( tx ).closeAsync(); + verify( tx ).closeAsync( false ); } }