From c819103f6c4f18be583f936bb780d374d77ffe02 Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Mon, 8 Jun 2020 18:01:57 +0200 Subject: [PATCH 1/6] =?UTF-8?q?Don=E2=80=99t=20swallow=20the=20cause=20of?= =?UTF-8?q?=20a=20TX=20termination.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There are a couple of scenarions in which a transaction gets terminated. The cause will be propagated by the pull handler and the transaction will be marked accordingly. There might be a chance that a degration from a leader to a follower on the server side happens in between to calls to a run method on that transaction: The transaction is still open, but cannot run queries any longer. Outside a transactional function this leads correctly to a ClientException. Inside a transactional function, this must not happen. The retry logic must be able to find the cause of the termination and if there’s any, it should judge on the cause if it retries or not. This PR changes the following: - Add a StateHolder to the UnmangedTransaction - The holder is necessary to keep the single field volatie - The holder holds the state and a possible cause of termination - The holder is able to determine whether a session is still open or not. - It removes markTerminated from InternalAsyncTransaction as it was used only for tests. --- .../async/InternalAsyncTransaction.java | 5 - .../driver/internal/async/NetworkSession.java | 2 +- .../internal/async/UnmanagedTransaction.java | 73 ++++++++++---- ...sactionPullResponseCompletionListener.java | 2 +- .../reactive/InternalRxTransaction.java | 2 +- .../retry/ExponentialBackoffRetryLogic.java | 96 +++++++++++-------- .../integration/UnmanagedTransactionIT.java | 8 +- .../async/InternalAsyncTransactionTest.java | 19 ++-- .../async/UnmanagedTransactionTest.java | 8 +- ...ionPullResponseCompletionListenerTest.java | 2 +- ...ionPullResponseCompletionListenerTest.java | 2 +- .../reactive/InternalRxTransactionTest.java | 2 +- 12 files changed, 137 insertions(+), 84 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java index 2dd5851db9..64653904e3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java @@ -50,11 +50,6 @@ public CompletionStage runAsync(Query query) return tx.runAsync(query, true ); } - public void markTerminated() - { - tx.markTerminated(); - } - public boolean isOpen() { return tx.isOpen(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java index 79f0717974..3aa8d933a6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java @@ -138,7 +138,7 @@ public CompletionStage resetAsync() { if ( tx != null ) { - tx.markTerminated(); + tx.markTerminated( null ); } } ) .thenCompose( ignore -> connectionStage ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index d821029f3e..83da1ec368 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.async; +import java.util.EnumSet; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; @@ -55,7 +56,43 @@ private enum State COMMITTED, /** This transaction has been rolled back */ - ROLLED_BACK + ROLLED_BACK; + + private final StateHolder defaultStateHolder; + + State() + { + this.defaultStateHolder = new StateHolder(this, null); + } + } + + private static final EnumSet OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED ); + + /** + * This is a holder so that we can have ony the state volatile in the tx without having to synchronize the whole block. + */ + private static final class StateHolder { + + /** + * The actual state. + */ + final State value; + + /** + * If this holder contains a state of {@link State#TERMINATED}, this represents the cause if any. + */ + final Throwable causeOfTermination; + + StateHolder(State value, Throwable causeOfTermination) { + + this.value = value; + this.causeOfTermination = causeOfTermination; + } + + boolean isOpen() + { + return OPEN_STATES.contains( this.value ); + } } private final Connection connection; @@ -64,7 +101,7 @@ private enum State private final ResultCursorsHolder resultCursors; private final long fetchSize; - private volatile State state = State.ACTIVE; + private volatile StateHolder state = new StateHolder(State.ACTIVE, null); public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize ) { @@ -104,11 +141,11 @@ public CompletionStage closeAsync() public CompletionStage commitAsync() { - if ( state == State.COMMITTED ) + if ( state.value == State.COMMITTED ) { return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) ); } - else if ( state == State.ROLLED_BACK ) + else if ( state.value == State.ROLLED_BACK ) { return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) ); } @@ -122,11 +159,11 @@ else if ( state == State.ROLLED_BACK ) public CompletionStage rollbackAsync() { - if ( state == State.COMMITTED ) + if ( state.value == State.COMMITTED ) { return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) ); } - else if ( state == State.ROLLED_BACK ) + else if ( state.value == State.ROLLED_BACK ) { return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) ); } @@ -158,12 +195,12 @@ public CompletionStage runRx(Query query) public boolean isOpen() { - return state != State.COMMITTED && state != State.ROLLED_BACK; + return state.isOpen(); } - public void markTerminated() + public void markTerminated( Throwable cause ) { - state = State.TERMINATED; + state = new StateHolder(State.TERMINATED, cause); } public Connection connection() @@ -173,34 +210,34 @@ public Connection connection() private void ensureCanRunQueries() { - if ( state == State.COMMITTED ) + if ( state.value == State.COMMITTED ) { throw new ClientException( "Cannot run more queries in this transaction, it has been committed" ); } - else if ( state == State.ROLLED_BACK ) + else if ( state.value == State.ROLLED_BACK ) { throw new ClientException( "Cannot run more queries in this transaction, it has been rolled back" ); } - else if ( state == State.TERMINATED ) + else if ( state.value == State.TERMINATED ) { throw new ClientException( "Cannot run more queries in this transaction, " + - "it has either experienced an fatal error or was explicitly terminated" ); + "it has either experienced an fatal error or was explicitly terminated", state.causeOfTermination ); } } private CompletionStage doCommitAsync() { - if ( state == State.TERMINATED ) + if ( state.value == State.TERMINATED ) { return failedFuture( new ClientException( "Transaction can't be committed. " + - "It has been rolled back either because of an error or explicit termination" ) ); + "It has been rolled back either because of an error or explicit termination", state.causeOfTermination ) ); } return protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark ); } private CompletionStage doRollbackAsync() { - if ( state == State.TERMINATED ) + if ( state.value == State.TERMINATED ) { return completedWithNull(); } @@ -224,11 +261,11 @@ private void transactionClosed( boolean isCommitted ) { if ( isCommitted ) { - state = State.COMMITTED; + state = State.COMMITTED.defaultStateHolder; } else { - state = State.ROLLED_BACK; + state = State.ROLLED_BACK.defaultStateHolder; } connection.release(); // release in background } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java index e5103ad76d..760b9b4b7a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java @@ -45,6 +45,6 @@ public void afterFailure( Throwable error ) // always mark transaction as terminated because every error is "acknowledged" with a RESET message // so database forgets about the transaction after the first error // such transaction should not attempt to commit and can be considered as rolled back - tx.markTerminated(); + tx.markTerminated( error ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index 307c5160ee..0d755a63d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -56,7 +56,7 @@ public RxResult run(Query query) // The logic here shall be the same as `TransactionPullResponseHandler#afterFailure` as that is where cursor handling failure // This is optional as tx still holds a reference to all cursor futures and they will be clean up properly in commit Throwable error = Futures.completionExceptionCause( completionError ); - tx.markTerminated(); + tx.markTerminated( error ); cursorFuture.completeExceptionally( error ); } } ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index da88e3e041..a02304e46a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -39,6 +39,7 @@ import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -100,8 +101,9 @@ public T retry( Supplier work ) { return work.get(); } - catch ( Throwable error ) + catch ( Throwable throwable ) { + Throwable error = extractPossibleTerminationCause( throwable ); if ( canRetryOn( error ) ) { long currentTime = clock.millis(); @@ -122,8 +124,10 @@ public T retry( Supplier work ) continue; } } - addSuppressed( error, errors ); - throw error; + + // Add the original error in case we didn't continue the loop from within the if above. + addSuppressed( throwable, errors ); + throw throwable; } } } @@ -144,54 +148,67 @@ public Publisher retryRx( Publisher work ) protected boolean canRetryOn( Throwable error ) { - return error instanceof SessionExpiredException || - error instanceof ServiceUnavailableException || - isTransientError( error ); + return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || isTransientError( error ); + } + + /** + * Extracts the possible cause of a transaction that has been marked terminated. + * + * @param error + * @return + */ + private static Throwable extractPossibleTerminationCause( Throwable error ) + { + + // Having a dedicated "TerminatedException" inheriting from ClientException might be a good idea. + if ( error instanceof ClientException && error.getCause() != null ) + { + return error.getCause(); + } + return error; } private Function,Publisher> retryRxCondition() { - return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> { - Throwable lastError = t2.getT1(); + return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> + { + + Throwable throwable = t2.getT1(); + Throwable error = extractPossibleTerminationCause( throwable ); + Context ctx = t2.getT2(); List errors = ctx.getOrDefault( "errors", null ); - long startTime = ctx.getOrDefault( "startTime", -1L ); + long startTime = ctx.getOrDefault( "startTime", -1L ); long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs ); - if( !canRetryOn( lastError ) ) + if ( canRetryOn( error ) ) { - addSuppressed( lastError, errors ); - return Mono.error( lastError ); - } - - long currentTime = clock.millis(); - if ( startTime == -1 ) - { - startTime = currentTime; - } + long currentTime = clock.millis(); + if ( startTime == -1 ) + { + startTime = currentTime; + } - long elapsedTime = currentTime - startTime; - if ( elapsedTime < maxRetryTimeMs ) - { - long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); - log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", lastError ); + long elapsedTime = currentTime - startTime; + if ( elapsedTime < maxRetryTimeMs ) + { + long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); + log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); - nextDelayMs = (long) (nextDelayMs * multiplier); - errors = recordError( lastError, errors ); + nextDelayMs = (long) (nextDelayMs * multiplier); + errors = recordError( error, errors ); - // retry on netty event loop thread - EventExecutor eventExecutor = eventExecutorGroup.next(); - return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ) - .delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); + // retry on netty event loop thread + EventExecutor eventExecutor = eventExecutorGroup.next(); + return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement( + Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); + } } - else - { - addSuppressed( lastError, errors ); + addSuppressed( throwable, errors ); - return Mono.error( lastError ); - } + return Mono.error( throwable ); } ); } @@ -249,9 +266,10 @@ private void executeWork( CompletableFuture resultFuture, Supplier void retryOnError( CompletableFuture resultFuture, Supplier> work, - long startTime, long retryDelayMs, Throwable error, List errors ) + private void retryOnError( CompletableFuture resultFuture, Supplier> work, long startTime, long retryDelayMs, Throwable throwable, + List errors ) { + Throwable error = extractPossibleTerminationCause( throwable ); if ( canRetryOn( error ) ) { long currentTime = clock.millis(); @@ -269,8 +287,8 @@ private void retryOnError( CompletableFuture resultFuture, Supplier await( tx.commitAsync() ) ); assertThat( e.getMessage(), startsWith( "Transaction can't be committed" ) ); @@ -162,7 +162,7 @@ void shouldRollbackAfterTermination() { UnmanagedTransaction tx = beginTransaction(); - tx.markTerminated(); + tx.markTerminated( null ); assertNull( await( tx.rollbackAsync() ) ); assertFalse( tx.isOpen() ); @@ -173,7 +173,7 @@ void shouldFailToRunQueryWhenTerminated() { UnmanagedTransaction tx = beginTransaction(); txRun( tx, "CREATE (:MyLabel)" ); - tx.markTerminated(); + tx.markTerminated( null ); ClientException e = assertThrows( ClientException.class, () -> txRun( tx, "CREATE (:MyOtherLabel)" ) ); assertThat( e.getMessage(), startsWith( "Cannot run more queries in this transaction" ) ); @@ -183,7 +183,7 @@ void shouldFailToRunQueryWhenTerminated() void shouldBePossibleToRunMoreTransactionsAfterOneIsTerminated() { UnmanagedTransaction tx1 = beginTransaction(); - tx1.markTerminated(); + tx1.markTerminated( null ); // commit should fail, make session forget about this transaction and release the connection to the pool ClientException e = assertThrows( ClientException.class, () -> await( tx1.commitAsync() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java index 8404d1682f..e30f0d27bd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/InternalAsyncTransactionTest.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.summary.ResultSummary; @@ -64,6 +65,7 @@ class InternalAsyncTransactionTest { private Connection connection; + private NetworkSession networkSession; private InternalAsyncTransaction tx; @BeforeEach @@ -73,7 +75,8 @@ void setUp() ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); when( connectionProvider.acquireConnection( any( ConnectionContext.class ) ) ) .thenReturn( completedFuture( connection ) ); - InternalAsyncSession session = new InternalAsyncSession( newSession( connectionProvider ) ); + networkSession = newSession(connectionProvider); + InternalAsyncSession session = new InternalAsyncSession(networkSession); tx = (InternalAsyncTransaction) await( session.beginTransactionAsync() ); } @@ -91,7 +94,7 @@ private static Stream>> @ParameterizedTest @MethodSource( "allSessionRunMethods" ) - void shouldFlushOnRun( Function> runReturnOne ) throws Throwable + void shouldFlushOnRun( Function> runReturnOne ) { setupSuccessfulRunAndPull( connection ); @@ -102,7 +105,7 @@ void shouldFlushOnRun( Function> } @Test - void shouldCommit() throws Throwable + void shouldCommit() { await( tx.commitAsync() ); @@ -112,7 +115,7 @@ void shouldCommit() throws Throwable } @Test - void shouldRollback() throws Throwable + void shouldRollback() { await( tx.rollbackAsync() ); @@ -122,9 +125,9 @@ void shouldRollback() throws Throwable } @Test - void shouldRollbackWhenFailedRun() throws Throwable + void shouldRollbackWhenFailedRun() { - tx.markTerminated(); + Futures.blockingGet( networkSession.resetAsync() ); ClientException clientException = assertThrows( ClientException.class, () -> await( tx.commitAsync() ) ); assertThat( clientException.getMessage(), containsString( "It has been rolled back either because of an error or explicit termination" ) ); @@ -133,7 +136,7 @@ void shouldRollbackWhenFailedRun() throws Throwable } @Test - void shouldReleaseConnectionWhenFailedToCommit() throws Throwable + void shouldReleaseConnectionWhenFailedToCommit() { setupFailingCommit( connection ); assertThrows( Exception.class, () -> await( tx.commitAsync() ) ); @@ -143,7 +146,7 @@ void shouldReleaseConnectionWhenFailedToCommit() throws Throwable } @Test - void shouldReleaseConnectionWhenFailedToRollback() throws Throwable + void shouldReleaseConnectionWhenFailedToRollback() { setupFailingRollback( connection ); assertThrows( Exception.class, () -> await( tx.rollbackAsync() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java index a1b6249ae4..5ec591906c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/UnmanagedTransactionTest.java @@ -144,7 +144,7 @@ void shouldBeClosedWhenMarkedAsTerminated() { UnmanagedTransaction tx = beginTx( connectionMock() ); - tx.markTerminated(); + tx.markTerminated( null ); assertTrue( tx.isOpen() ); } @@ -154,7 +154,7 @@ void shouldBeClosedWhenMarkedTerminatedAndClosed() { UnmanagedTransaction tx = beginTx( connectionMock() ); - tx.markTerminated(); + tx.markTerminated( null ); await( tx.closeAsync() ); assertFalse( tx.isOpen() ); @@ -196,7 +196,7 @@ void shouldReleaseConnectionWhenTerminatedAndCommitted() Connection connection = connectionMock(); UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); - tx.markTerminated(); + tx.markTerminated( null ); assertThrows( ClientException.class, () -> await( tx.commitAsync() ) ); @@ -210,7 +210,7 @@ void shouldReleaseConnectionWhenTerminatedAndRolledBack() Connection connection = connectionMock(); UnmanagedTransaction tx = new UnmanagedTransaction( connection, new DefaultBookmarkHolder(), UNLIMITED_FETCH_SIZE ); - tx.markTerminated(); + tx.markTerminated( null ); await( tx.rollbackAsync() ); verify( connection ).release(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java index ff4618371f..910c146e89 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListenerTest.java @@ -73,6 +73,6 @@ private static void testErrorHandling( Throwable error ) handler.onFailure( error ); - verify( tx ).markTerminated(); + verify( tx ).markTerminated( error ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java index c6306aa5ef..9b793f40e4 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/pulln/TransactionPullResponseCompletionListenerTest.java @@ -73,7 +73,7 @@ protected void shouldHandleFailure( BasicPullResponseHandler.State state ) // Then assertThat( handler.state(), equalTo( BasicPullResponseHandler.State.FAILURE_STATE ) ); - verify( tx ).markTerminated(); + verify( tx ).markTerminated( error ); verify( recordConsumer ).accept( null, error ); verify( summaryConsumer ).accept( any( ResultSummary.class ), eq( error ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java index 99fc715513..656a8dddb8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalRxTransactionTest.java @@ -135,6 +135,6 @@ void shouldMarkTxIfFailedToRun( Function runReturnOne ) verify( tx ).runRx( any( Query.class ) ); RuntimeException t = assertThrows( CompletionException.class, () -> Futures.getNow( cursorFuture ) ); assertThat( t.getCause(), equalTo( error ) ); - verify( tx ).markTerminated(); + verify( tx ).markTerminated( error ); } } From bb98725c8111ee586da2b44160ba1d8bd270f107 Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Mon, 8 Jun 2020 19:50:39 +0200 Subject: [PATCH 2/6] Polishing. This removes the circular dependency between holder and state. I had introduced it cause I wanted to avoid object allocation. This solution here is better: It also creates only 3 different holders for non terminated states, but without the circular dependency. --- .../internal/async/UnmanagedTransaction.java | 49 +++++++++++++------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java index 83da1ec368..a9c504cd58 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java @@ -56,22 +56,18 @@ private enum State COMMITTED, /** This transaction has been rolled back */ - ROLLED_BACK; - - private final StateHolder defaultStateHolder; - - State() - { - this.defaultStateHolder = new StateHolder(this, null); - } + ROLLED_BACK } - private static final EnumSet OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED ); - /** * This is a holder so that we can have ony the state volatile in the tx without having to synchronize the whole block. */ - private static final class StateHolder { + private static final class StateHolder + { + private static final EnumSet OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED ); + private static final StateHolder ACTIVE_HOLDER = new StateHolder( State.ACTIVE, null ); + private static final StateHolder COMMITTED_HOLDER = new StateHolder( State.COMMITTED, null ); + private static final StateHolder ROLLED_BACK_HOLDER = new StateHolder( State.ROLLED_BACK, null ); /** * The actual state. @@ -83,8 +79,29 @@ private static final class StateHolder { */ final Throwable causeOfTermination; - StateHolder(State value, Throwable causeOfTermination) { + static StateHolder of( State value ) + { + switch ( value ) + { + case ACTIVE: + return ACTIVE_HOLDER; + case COMMITTED: + return COMMITTED_HOLDER; + case ROLLED_BACK: + return ROLLED_BACK_HOLDER; + case TERMINATED: + default: + throw new IllegalArgumentException( "Cannot provide a default state holder for state " + value ); + } + } + + static StateHolder terminatedWith( Throwable cause ) + { + return new StateHolder( State.TERMINATED, cause ); + } + private StateHolder( State value, Throwable causeOfTermination ) + { this.value = value; this.causeOfTermination = causeOfTermination; } @@ -101,7 +118,7 @@ boolean isOpen() private final ResultCursorsHolder resultCursors; private final long fetchSize; - private volatile StateHolder state = new StateHolder(State.ACTIVE, null); + private volatile StateHolder state = StateHolder.of( State.ACTIVE ); public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize ) { @@ -200,7 +217,7 @@ public boolean isOpen() public void markTerminated( Throwable cause ) { - state = new StateHolder(State.TERMINATED, cause); + state = StateHolder.terminatedWith( cause ); } public Connection connection() @@ -261,11 +278,11 @@ private void transactionClosed( boolean isCommitted ) { if ( isCommitted ) { - state = State.COMMITTED.defaultStateHolder; + state = StateHolder.of( State.COMMITTED ); } else { - state = State.ROLLED_BACK.defaultStateHolder; + state = StateHolder.of( State.ROLLED_BACK ); } connection.release(); // release in background } From 3f4d5d855b549f10146a60687094e2deb8452c3d Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Mon, 8 Jun 2020 20:15:41 +0200 Subject: [PATCH 3/6] Add tests. --- .../ExponentialBackoffRetryLogicTest.java | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index 513ac2a9b1..5b5f26d325 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.retry; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +43,7 @@ import org.neo4j.driver.Logger; import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -762,6 +765,50 @@ void doesNotCollectSuppressedErrorsWhenSameErrorIsThrownRx() assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); } + @Test + void doesRetryOnClientExceptionWithRetryableCause() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = logic.retry( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw clientExceptionWithValidTerminationCause(); + } + return "Done"; + } ); + + assertEquals( "Done", result ); + } + + @Test + void doesNotRetryOnRandomClientException() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + ClientException exception = Assertions.assertThrows( ClientException.class, () -> logic.retry( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw randomClientException(); + } + return "Done"; + } ) ); + + assertEquals( "Meeh", exception.getMessage() ); + } + @Test void eachRetryIsLogged() { @@ -781,6 +828,52 @@ void eachRetryIsLogged() ); } + @Test + void doesRetryOnClientExceptionWithRetryableCauseAsync() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = await( logic.retryAsync( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw clientExceptionWithValidTerminationCause(); + } + return CompletableFuture.completedFuture( "Done" ); + } ) ); + + assertEquals( "Done", result ); + } + + @Test + void doesNotRetryOnRandomClientExceptionAsync() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + ClientException exception = Assertions.assertThrows( ClientException.class, () -> await( logic.retryAsync( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw randomClientException(); + } + return CompletableFuture.completedFuture( "Done" ); + } ) ) ); + + assertEquals( "Meeh", exception.getMessage() ); + } + @Test void eachRetryIsLoggedAsync() { @@ -802,6 +895,52 @@ void eachRetryIsLoggedAsync() ); } + @Test + void doesRetryOnClientExceptionWithRetryableCauseRx() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + String result = await( Mono.from( logic.retryRx( Mono.fromSupplier( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw clientExceptionWithValidTerminationCause(); + } + return "Done"; + } ) ) ) ); + + assertEquals( "Done", result ); + } + + @Test + void doesNotRetryOnRandomClientExceptionRx() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); + + AtomicBoolean exceptionThrown = new AtomicBoolean( false ); + ClientException exception = Assertions.assertThrows( ClientException.class, () -> await( Mono.from( logic.retryRx( Mono.fromSupplier( () -> + { + if ( exceptionThrown.compareAndSet( false, true ) ) + { + throw randomClientException(); + } + return "Done"; + } ) ) ) ) ); + + assertEquals( "Meeh", exception.getMessage() ); + } + @Test void eachRetryIsLoggedRx() { @@ -1111,6 +1250,16 @@ private static ServiceUnavailableException serviceUnavailable() return new ServiceUnavailableException( "" ); } + private static RuntimeException clientExceptionWithValidTerminationCause() + { + return new ClientException( "¯\\_(ツ)_/¯", serviceUnavailable() ); + } + + private static RuntimeException randomClientException() + { + return new ClientException( "Meeh" ); + } + private static SessionExpiredException sessionExpired() { return new SessionExpiredException( "" ); From 3f2714b72e02accb6a4506a4964e650a934395a2 Mon Sep 17 00:00:00 2001 From: Gregory Woods Date: Mon, 8 Jun 2020 21:20:38 +0100 Subject: [PATCH 4/6] Bolt stub test --- .../integration/RoutingDriverBoltKitTest.java | 36 +++++++++++++++++++ .../acquire_endpoints_twice_v4.script | 16 +++++++++ ...ble_to_write_server_tx_func_retries.script | 25 +++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 driver/src/test/resources/acquire_endpoints_twice_v4.script create mode 100644 driver/src/test/resources/not_able_to_write_server_tx_func_retries.script diff --git a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java index 36afe38eb2..24e2fe3628 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -534,6 +535,41 @@ void shouldHandleLeaderSwitchWhenWritingInTransaction() throws IOException, Inte assertThat( server.exitStatus(), equalTo( 0 ) ); } + @Test + void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunction() throws IOException, InterruptedException + { + // Given + StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); + + //START a write server that fails on the first write attempt but then succeeds on the second + StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 ); + URI uri = URI.create( "neo4j://127.0.0.1:9001" ); + + Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() ); + List names; + + try ( Session session = driver.session( builder().withDatabase( "mydatabase" ).build() ) ) + { + names = session.writeTransaction( tx -> { + tx.run( "RETURN 1"); + try + { + Thread.sleep( 100 ); + } + catch ( InterruptedException ignored ) + { + } + return tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() ); + } ); + } + + assertEquals( asList( "Foo", "Bar" ), names ); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + @Test void shouldSendInitialBookmark() throws Exception { diff --git a/driver/src/test/resources/acquire_endpoints_twice_v4.script b/driver/src/test/resources/acquire_endpoints_twice_v4.script new file mode 100644 index 0000000000..f92363e573 --- /dev/null +++ b/driver/src/test/resources/acquire_endpoints_twice_v4.script @@ -0,0 +1,16 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO HELLO +!: AUTO GOODBYE + +C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": { "address": "127.0.0.1:9001"}, "database": "mydatabase"} {"mode": "r", "db": "system"} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": { "address": "127.0.0.1:9001"}, "database": "mydatabase"} {"mode": "r", "db": "system"} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} + diff --git a/driver/src/test/resources/not_able_to_write_server_tx_func_retries.script b/driver/src/test/resources/not_able_to_write_server_tx_func_retries.script new file mode 100644 index 0000000000..447bcbf7fb --- /dev/null +++ b/driver/src/test/resources/not_able_to_write_server_tx_func_retries.script @@ -0,0 +1,25 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO BEGIN +!: AUTO HELLO +!: AUTO GOODBYE +!: AUTO ROLLBACK + +C: RUN "RETURN 1" {} {} + PULL {"n": 1000} +S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"} + IGNORED +C: RUN "RETURN 1" {} {} + PULL {"n": 1000} +S: SUCCESS {"fields": ["1"]} + RECORD [1] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} {} + PULL {"n": 1000} +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Foo"] + RECORD ["Bar"] + SUCCESS {} +C: COMMIT +S: SUCCESS {"bookmark": "NewBookmark"} + From ce4933147ffaa6be7a60a05a2b2164558ec5436c Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Tue, 9 Jun 2020 12:09:40 +0200 Subject: [PATCH 5/6] Add asynchronous and reactive tests. --- .../integration/RoutingDriverBoltKitTest.java | 152 +++++++++++++----- ..._to_write_server_tx_func_retries_rx.script | 25 +++ 2 files changed, 139 insertions(+), 38 deletions(-) create mode 100644 driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script diff --git a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java index 24e2fe3628..f395d09910 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java @@ -21,6 +21,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.io.IOException; import java.net.URI; @@ -33,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.neo4j.driver.AccessMode; import org.neo4j.driver.AuthToken; @@ -42,10 +46,11 @@ import org.neo4j.driver.GraphDatabase; import org.neo4j.driver.Logger; import org.neo4j.driver.Record; -import org.neo4j.driver.Session; import org.neo4j.driver.Result; +import org.neo4j.driver.Session; import org.neo4j.driver.Transaction; import org.neo4j.driver.TransactionWork; +import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.exceptions.TransientException; @@ -56,9 +61,12 @@ import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.internal.util.DriverFactoryWithClock; import org.neo4j.driver.internal.util.DriverFactoryWithFixedRetryLogic; +import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.internal.util.SleeplessClock; import org.neo4j.driver.net.ServerAddress; import org.neo4j.driver.net.ServerAddressResolver; +import org.neo4j.driver.reactive.RxResult; +import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.util.StubServer; import org.neo4j.driver.util.StubServerController; @@ -110,7 +118,8 @@ void shouldHandleAcquireReadSession() throws IOException, InterruptedException, StubServer readServer = stubController.startStub( "read_server_v3_read.script", 9005 ); URI uri = URI.create( "neo4j://127.0.0.1:9001" ); - try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) + try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) { List result = session.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ); @@ -134,8 +143,7 @@ void shouldHandleAcquireReadTransaction() throws IOException, InterruptedExcepti Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) { - List result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) - .list( record -> record.get( "n.name" ).asString() ) ); + List result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ) ); assertThat( result, equalTo( asList( "Bob", "Alice", "Tina" ) ) ); } @@ -154,8 +162,7 @@ void shouldHandleAcquireReadSessionAndTransaction() throws IOException, Interrup StubServer readServer = stubController.startStub( "read_server_v3_read_tx.script", 9005 ); URI uri = URI.create( "neo4j://127.0.0.1:9001" ); try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); - Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); - Transaction tx = session.beginTransaction() ) + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); Transaction tx = session.beginTransaction() ) { List result = tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ); @@ -210,7 +217,8 @@ void shouldRoundRobinReadServersWhenUsingTransaction() throws IOException, Inter // Run twice, one on each read server for ( int i = 0; i < 2; i++ ) { - try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); Transaction tx = session.beginTransaction() ) + try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); + Transaction tx = session.beginTransaction() ) { assertThat( tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( "n.name" ).asString() ), equalTo( asList( "Bob", "Alice", "Tina" ) ) ); @@ -235,7 +243,8 @@ void shouldThrowSessionExpiredIfReadServerDisappears() throws IOException, Inter URI uri = URI.create( "neo4j://127.0.0.1:9001" ); //Expect - assertThrows( SessionExpiredException.class, () -> { + assertThrows( SessionExpiredException.class, () -> + { try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) ) { @@ -257,7 +266,8 @@ void shouldThrowSessionExpiredIfReadServerDisappearsWhenUsingTransaction() throw URI uri = URI.create( "neo4j://127.0.0.1:9001" ); //Expect - SessionExpiredException e = assertThrows( SessionExpiredException.class, () -> { + SessionExpiredException e = assertThrows( SessionExpiredException.class, () -> + { try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ); Transaction tx = session.beginTransaction() ) @@ -306,8 +316,7 @@ void shouldThrowSessionExpiredIfWriteServerDisappearsWhenUsingTransaction() thro URI uri = URI.create( "neo4j://127.0.0.1:9001" ); //Expect try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); - Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); - Transaction tx = session.beginTransaction() ) + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); Transaction tx = session.beginTransaction() ) { assertThrows( SessionExpiredException.class, () -> tx.run( "MATCH (n) RETURN n.name" ).consume() ); } @@ -365,8 +374,7 @@ void shouldHandleAcquireWriteSessionAndTransaction() throws IOException, Interru StubServer writeServer = stubController.startStub( "write_server_v3_write_tx.script", 9007 ); URI uri = URI.create( "neo4j://127.0.0.1:9001" ); try ( Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); - Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); - Transaction tx = session.beginTransaction() ) + Session session = driver.session( builder().withDefaultAccessMode( AccessMode.WRITE ).build() ); Transaction tx = session.beginTransaction() ) { tx.run( "CREATE (n {name:'Bob'})" ); tx.commit(); @@ -541,7 +549,7 @@ void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunction() throws IOExceptio // Given StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); - //START a write server that fails on the first write attempt but then succeeds on the second + // START a write server that fails on the first write attempt but then succeeds on the second StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 ); URI uri = URI.create( "neo4j://127.0.0.1:9001" ); @@ -550,22 +558,93 @@ void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunction() throws IOExceptio try ( Session session = driver.session( builder().withDatabase( "mydatabase" ).build() ) ) { - names = session.writeTransaction( tx -> { - tx.run( "RETURN 1"); + names = session.writeTransaction( tx -> + { + tx.run( "RETURN 1" ); try { Thread.sleep( 100 ); } - catch ( InterruptedException ignored ) + catch ( InterruptedException ex ) { } - return tx.run( "MATCH (n) RETURN n.name" ).list( record -> record.get( 0 ).asString() ); + return tx.run( "MATCH (n) RETURN n.name" ).list( RoutingDriverBoltKitTest::extractNameField ); } ); } assertEquals( asList( "Foo", "Bar" ), names ); // Finally + driver.close(); + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + + @Test + void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionAsync() throws IOException, InterruptedException + { + // Given + StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); + + // START a write server that fails on the first write attempt but then succeeds on the second + StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries.script", 9007 ); + URI uri = URI.create( "neo4j://127.0.0.1:9001" ); + + Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() ); + AsyncSession session = driver.asyncSession( builder().withDatabase( "mydatabase" ).build() ); + List names = Futures.blockingGet( session.writeTransactionAsync( + tx -> tx.runAsync( "RETURN 1" ) + .thenComposeAsync( ignored -> { + try + { + Thread.sleep( 100 ); + } + catch ( InterruptedException ex ) + { + } + return tx.runAsync( "MATCH (n) RETURN n.name" ); + } ) + .thenComposeAsync( cursor -> cursor.listAsync( RoutingDriverBoltKitTest::extractNameField ) ) ) ); + + assertEquals( asList( "Foo", "Bar" ), names ); + + // Finally + driver.close(); + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer.exitStatus(), equalTo( 0 ) ); + } + + private static String extractNameField(Record record) + { + return record.get( 0 ).asString(); + } + + // This does not exactly reproduce the async and blocking versions above, as we don't have any means of ignoring + // the flux of the RETURN 1 query (not pulling the result) like we do in above, so + @Test + void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionRX() throws IOException, InterruptedException + { + // Given + StubServer server = stubController.startStub( "acquire_endpoints_twice_v4.script", 9001 ); + + // START a write server that fails on the first write attempt but then succeeds on the second + StubServer writeServer = stubController.startStub( "not_able_to_write_server_tx_func_retries_rx.script", 9007 ); + URI uri = URI.create( "neo4j://127.0.0.1:9001" ); + + Driver driver = GraphDatabase.driver( uri, Config.builder().withMaxTransactionRetryTime( 1, TimeUnit.MILLISECONDS ).build() ); + + Flux fluxOfNames = Flux.usingWhen( Mono.fromSupplier( () -> driver.rxSession( builder().withDatabase( "mydatabase" ).build() ) ), + session -> session.writeTransaction( tx -> + { + RxResult result = tx.run( "RETURN 1" ); + return Flux.from( result.records() ).limitRate( 100 ).thenMany( tx.run( "MATCH (n) RETURN n.name" ).records() ).limitRate( 100 ).map( + RoutingDriverBoltKitTest::extractNameField ); + } ), RxSession::close ); + + StepVerifier.create( fluxOfNames ).expectNext( "Foo", "Bar" ).verifyComplete(); + + // Finally + driver.close(); assertThat( server.exitStatus(), equalTo( 0 ) ); assertThat( writeServer.exitStatus(), equalTo( 0 ) ); } @@ -732,8 +811,7 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc Logger logger = mock( Logger.class ); Config config = insecureBuilder().withLogging( ignored -> logger ).build(); - try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); - Session session = driver.session() ) + try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); Session session = driver.session() ) { AtomicInteger invocations = new AtomicInteger(); List records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ); @@ -766,8 +844,7 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemovedV3() throws E Logger logger = mock( Logger.class ); Config config = insecureBuilder().withLogging( ignored -> logger ).build(); - try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); - Session session = driver.session() ) + try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9001", config ); Session session = driver.session() ) { AtomicInteger invocations = new AtomicInteger(); List records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) ); @@ -793,8 +870,7 @@ void shouldRetryReadTransactionUntilFailure() throws Exception StubServer brokenReader1 = stubController.startStub( "dead_read_server_tx.script", 9005 ); StubServer brokenReader2 = stubController.startStub( "dead_read_server_tx.script", 9006 ); - try ( Driver driver = newDriverWithFixedRetries( "neo4j://127.0.0.1:9001", 1 ); - Session session = driver.session() ) + try ( Driver driver = newDriverWithFixedRetries( "neo4j://127.0.0.1:9001", 1 ); Session session = driver.session() ) { AtomicInteger invocations = new AtomicInteger(); assertThrows( SessionExpiredException.class, () -> session.readTransaction( queryWork( "MATCH (n) RETURN n.name", invocations ) ) ); @@ -954,8 +1030,7 @@ void shouldServeReadsButFailWritesWhenNoWritersAvailable() throws Exception StubServer router2 = stubController.startStub( "discover_no_writers_9010.script", 9004 ); StubServer reader = stubController.startStub( "read_server_v3_read_tx.script", 9003 ); - try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9010", INSECURE_CONFIG ); - Session session = driver.session() ) + try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9010", INSECURE_CONFIG ); Session session = driver.session() ) { assertEquals( asList( "Bob", "Alice", "Tina" ), readStrings( "MATCH (n) RETURN n.name", session ) ); @@ -1038,10 +1113,9 @@ void shouldSendMultipleBookmarks() throws Exception StubServer router = stubController.startStub( "acquire_endpoints_v3.script", 9001 ); StubServer writer = stubController.startStub( "multiple_bookmarks.script", 9007 ); - try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9001", INSECURE_CONFIG ); - Session session = driver.session( builder().withBookmarks( InternalBookmark.parse( - asOrderedSet( "neo4j:bookmark:v1:tx5", "neo4j:bookmark:v1:tx29", "neo4j:bookmark:v1:tx94", "neo4j:bookmark:v1:tx56", - "neo4j:bookmark:v1:tx16", "neo4j:bookmark:v1:tx68" ) ) ).build() ) ) + try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9001", INSECURE_CONFIG ); Session session = driver.session( builder().withBookmarks( + InternalBookmark.parse( asOrderedSet( "neo4j:bookmark:v1:tx5", "neo4j:bookmark:v1:tx29", "neo4j:bookmark:v1:tx94", "neo4j:bookmark:v1:tx56", + "neo4j:bookmark:v1:tx16", "neo4j:bookmark:v1:tx68" ) ) ).build() ) ) { try ( Transaction tx = session.beginTransaction() ) { @@ -1113,7 +1187,8 @@ void shouldUseResolverDuringRediscoveryWhenExistingRoutersFail() throws Exceptio StubServer reader = stubController.startStub( "read_server_v3_read_tx.script", 9005 ); AtomicBoolean resolverInvoked = new AtomicBoolean(); - ServerAddressResolver resolver = address -> { + ServerAddressResolver resolver = address -> + { if ( resolverInvoked.compareAndSet( false, true ) ) { // return the address first time @@ -1135,13 +1210,11 @@ void shouldUseResolverDuringRediscoveryWhenExistingRoutersFail() throws Exceptio try ( Session session = driver.session() ) { // run first query against 9001, which should return result and exit - List names1 = session.run( "MATCH (n) RETURN n.name AS name" ) - .list( record -> record.get( "name" ).asString() ); + List names1 = session.run( "MATCH (n) RETURN n.name AS name" ).list( record -> record.get( "name" ).asString() ); assertEquals( asList( "Alice", "Bob", "Eve" ), names1 ); // run second query with retries, it should rediscover using 9042 returned by the resolver and read from 9005 - List names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) - .list( record -> record.get( 0 ).asString() ) ); + List names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ).list( RoutingDriverBoltKitTest::extractNameField ) ); assertEquals( asList( "Bob", "Alice", "Tina" ), names2 ); } } @@ -1183,7 +1256,8 @@ void useSessionAfterDriverIsClosed() throws Exception @Test void shouldRevertToInitialRouterIfKnownRouterThrowsProtocolErrors() throws Exception { - ServerAddressResolver resolver = a -> { + ServerAddressResolver resolver = a -> + { SortedSet addresses = new TreeSet<>( new PortBasedServerAddressComparator() ); addresses.add( ServerAddress.of( "127.0.0.1", 9001 ) ); addresses.add( ServerAddress.of( "127.0.0.1", 9003 ) ); @@ -1269,7 +1343,8 @@ private static Driver newDriver( String uriString, DriverFactory driverFactory, private static TransactionWork> queryWork( final String query, final AtomicInteger invocations ) { - return tx -> { + return tx -> + { invocations.incrementAndGet(); return tx.run( query ).list(); }; @@ -1277,7 +1352,8 @@ private static TransactionWork> queryWork( final String query, fina private static List readStrings( final String query, Session session ) { - return session.readTransaction( tx -> { + return session.readTransaction( tx -> + { List records = tx.run( query ).list(); List names = new ArrayList<>( records.size() ); for ( Record record : records ) diff --git a/driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script b/driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script new file mode 100644 index 0000000000..b2d651000c --- /dev/null +++ b/driver/src/test/resources/not_able_to_write_server_tx_func_retries_rx.script @@ -0,0 +1,25 @@ +!: BOLT 4 +!: AUTO RESET +!: AUTO BEGIN +!: AUTO HELLO +!: AUTO GOODBYE +!: AUTO ROLLBACK + +C: RUN "RETURN 1" {} {} +S: SUCCESS {"fields": ["1"]} +C: PULL {"n": 100} +S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"} +C: RUN "RETURN 1" {} {} +S: SUCCESS {"fields": ["1"]} +C: PULL {"n": 100} +S: RECORD [1] + SUCCESS {"has_more": false} +C: RUN "MATCH (n) RETURN n.name" {} {} +S: SUCCESS {"fields": ["n.name"]} +C: PULL {"n": 100} +S: RECORD ["Foo"] + RECORD ["Bar"] + SUCCESS {"has_more": false} +C: COMMIT +S: SUCCESS {"bookmark": "NewBookmark"} + From bc37bf161d5e2719a4811ba08d74f99bc51faea7 Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Tue, 9 Jun 2020 13:51:55 +0200 Subject: [PATCH 6/6] Fix comment. --- .../org/neo4j/driver/integration/RoutingDriverBoltKitTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java index f395d09910..54ab401551 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java @@ -620,7 +620,8 @@ private static String extractNameField(Record record) } // This does not exactly reproduce the async and blocking versions above, as we don't have any means of ignoring - // the flux of the RETURN 1 query (not pulling the result) like we do in above, so + // the flux of the RETURN 1 query (not pulling the result) like we do in above, so this is "just" a test for + // a leader going away during the execution of a flux. @Test void shouldHandleLeaderSwitchAndRetryWhenWritingInTxFunctionRX() throws IOException, InterruptedException {