diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 33c32414b2..0bbe5fdc71 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -289,32 +289,34 @@ CompletionStage currentConnectionIsOpen() private T transaction( AccessMode mode, TransactionWork work ) { - return getBlocking( transactionAsync( mode, tx -> + // use different code path compared to async so that work is executed in the caller thread + // caller thread will also be the one who sleeps between retries; + // it is unsafe to execute retries in the event loop threads because this can cause a deadlock + // event loop thread will bock and wait for itself to read some data + return retryLogic.retry( () -> { - try - { - // todo: given lambda can't be executed in even loop thread because it deadlocks - // todo: event loop executes a blocking operation and waits for itself to read from the network - // todo: this is most likely what happens... - - // todo: use of supplyAsync is a hack and it makes blocking API very different from 1.4 - // todo: because we now execute function in FJP.commonPool() - - // todo: bring back blocking retries with sleeps and etc. so that we execute TxWork in caller thread - return CompletableFuture.supplyAsync( () -> work.execute( tx ) ); -// T result = work.execute( tx ); -// return completedFuture( result ); - } - catch ( Throwable error ) + try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) ) { - return failedFuture( error ); + try + { + T result = work.execute( tx ); + tx.success(); + return result; + } + catch ( Throwable t ) + { + // mark transaction for failure if the given unit of work threw exception + // this will override any success marks that were made by the unit of work + tx.failure(); + throw t; + } } - } ) ); + } ); } private CompletionStage transactionAsync( AccessMode mode, TransactionWork> work ) { - return retryLogic.retry( () -> + return retryLogic.retryAsync( () -> { CompletableFuture resultFuture = new CompletableFuture<>(); CompletionStage txFuture = beginTransactionAsync( mode ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index a794ad480c..424dbb26c6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -80,7 +80,48 @@ public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup } @Override - public CompletionStage retry( Supplier> work ) + public T retry( Supplier work ) + { + List errors = null; + long startTime = -1; + long nextDelayMs = initialRetryDelayMs; + + while ( true ) + { + try + { + return work.get(); + } + catch ( Throwable error ) + { + if ( canRetryOn( error ) ) + { + long currentTime = clock.millis(); + if ( startTime == -1 ) + { + startTime = currentTime; + } + + long elapsedTime = currentTime - startTime; + if ( elapsedTime < maxRetryTimeMs ) + { + long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); + log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error ); + + sleep( delayWithJitterMs ); + nextDelayMs = (long) (nextDelayMs * multiplier); + errors = recordError( error, errors ); + continue; + } + } + addSuppressed( error, errors ); + throw error; + } + } + } + + @Override + public CompletionStage retryAsync( Supplier> work ) { CompletableFuture resultFuture = new CompletableFuture<>(); executeWorkInEventLoop( resultFuture, work ); @@ -109,7 +150,7 @@ private void retryWorkInEventLoop( CompletableFuture resultFuture, Suppli EventExecutor eventExecutor = eventExecutorGroup.next(); long delayWithJitterMs = computeDelayWithJitter( delayMs ); - log.warn( "Transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); + log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); eventExecutor.schedule( () -> { @@ -185,6 +226,19 @@ private long computeDelayWithJitter( long delayMs ) return ThreadLocalRandom.current().nextLong( min, max + 1 ); } + private void sleep( long delayMs ) + { + try + { + clock.sleep( delayMs ); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + throw new IllegalStateException( "Retries interrupted", e ); + } + } + private void verifyAfterConstruction() { if ( maxRetryTimeMs < 0 ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java index c728dc531e..29c23ce8ea 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java @@ -24,5 +24,7 @@ public interface RetryLogic { - CompletionStage retry( Supplier> work ); + T retry( Supplier work ); + + CompletionStage retryAsync( Supplier> work ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index b830f21e82..bde00cd30a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.retry; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.List; @@ -40,17 +41,23 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.startsWith; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.util.Futures.failedFuture; +import static org.neo4j.driver.internal.util.Futures.getBlocking; import static org.neo4j.driver.v1.util.TestUtil.await; public class ExponentialBackoffRetryLogicTest @@ -144,7 +151,22 @@ public void throwsForIllegalClock() } @Test - public void nextDelayCalculatedAccordingToMultiplier() + public void nextDelayCalculatedAccordingToMultiplier() throws Exception + { + int retries = 27; + int initialDelay = 1; + int multiplier = 3; + int noJitter = 0; + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, noJitter, clock ); + + retry( logic, retries ); + + assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), sleepValues( clock, retries ) ); + } + + @Test + public void nextDelayCalculatedAccordingToMultiplierAsync() { String result = "The Result"; int retries = 14; @@ -155,14 +177,33 @@ public void nextDelayCalculatedAccordingToMultiplier() ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, noJitter, Clock.SYSTEM ); - CompletionStage future = retry( retryLogic, retries, result ); + CompletionStage future = retryAsync( retryLogic, retries, result ); assertEquals( result, Futures.getBlocking( future ) ); assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), eventExecutor.scheduleDelays() ); } @Test - public void nextDelayCalculatedAccordingToJitter() + public void nextDelayCalculatedAccordingToJitter() throws Exception + { + int retries = 32; + double jitterFactor = 0.2; + int initialDelay = 1; + int multiplier = 2; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic logic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, jitterFactor, clock ); + + retry( logic, retries ); + + List sleepValues = sleepValues( clock, retries ); + List delaysWithoutJitter = delaysWithoutJitter( initialDelay, multiplier, retries ); + + assertDelaysApproximatelyEqual( delaysWithoutJitter, sleepValues, jitterFactor ); + } + + @Test + public void nextDelayCalculatedAccordingToJitterAsync() { String result = "The Result"; int retries = 24; @@ -173,7 +214,7 @@ public void nextDelayCalculatedAccordingToJitter() ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, jitterFactor, mock( Clock.class ) ); - CompletionStage future = retry( retryLogic, retries, result ); + CompletionStage future = retryAsync( retryLogic, retries, result ); assertEquals( result, Futures.getBlocking( future ) ); List scheduleDelays = eventExecutor.scheduleDelays(); @@ -183,7 +224,40 @@ public void nextDelayCalculatedAccordingToJitter() } @Test - public void doesNotRetryWhenMaxRetryTimeExceeded() + public void doesNotRetryWhenMaxRetryTimeExceeded() throws Exception + { + long retryStart = Clock.SYSTEM.millis(); + int initialDelay = 100; + int multiplier = 2; + long maxRetryTimeMs = 45; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( retryStart ) + .thenReturn( retryStart + maxRetryTimeMs - 5 ) + .thenReturn( retryStart + maxRetryTimeMs + 7 ); + + ExponentialBackoffRetryLogic logic = newRetryLogic( maxRetryTimeMs, initialDelay, multiplier, 0, clock ); + + Supplier workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenThrow( error ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( clock ).sleep( initialDelay ); + verify( clock ).sleep( initialDelay * multiplier ); + verify( workMock, times( 3 ) ).get(); + } + + @Test + public void doesNotRetryWhenMaxRetryTimeExceededAsync() { long retryStart = Clock.SYSTEM.millis(); int initialDelay = 100; @@ -200,7 +274,7 @@ public void doesNotRetryWhenMaxRetryTimeExceeded() SessionExpiredException error = sessionExpired(); when( workMock.get() ).thenReturn( failedFuture( error ) ); - CompletionStage future = retryLogic.retry( workMock ); + CompletionStage future = retryLogic.retryAsync( workMock ); try { @@ -221,7 +295,23 @@ public void doesNotRetryWhenMaxRetryTimeExceeded() } @Test - public void schedulesRetryOnServiceUnavailableException() + public void sleepsOnServiceUnavailableException() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 42, 1, 0, clock ); + + Supplier workMock = newWorkMock(); + ServiceUnavailableException error = serviceUnavailable(); + when( workMock.get() ).thenThrow( error ).thenReturn( null ); + + assertNull( logic.retry( workMock ) ); + + verify( workMock, times( 2 ) ).get(); + verify( clock ).sleep( 42 ); + } + + @Test + public void schedulesRetryOnServiceUnavailableExceptionAsync() { String result = "The Result"; Clock clock = mock( Clock.class ); @@ -232,7 +322,7 @@ public void schedulesRetryOnServiceUnavailableException() SessionExpiredException error = sessionExpired(); when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( completedFuture( result ) ); - assertEquals( result, await( retryLogic.retry( workMock ) ) ); + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); verify( workMock, times( 2 ) ).get(); List scheduleDelays = eventExecutor.scheduleDelays(); @@ -241,7 +331,23 @@ public void schedulesRetryOnServiceUnavailableException() } @Test - public void schedulesRetryOnSessionExpiredException() + public void sleepsOnSessionExpiredException() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 4242, 1, 0, clock ); + + Supplier workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenThrow( error ).thenReturn( null ); + + assertNull( logic.retry( workMock ) ); + + verify( workMock, times( 2 ) ).get(); + verify( clock ).sleep( 4242 ); + } + + @Test + public void schedulesRetryOnSessionExpiredExceptionAsync() { String result = "The Result"; Clock clock = mock( Clock.class ); @@ -252,7 +358,7 @@ public void schedulesRetryOnSessionExpiredException() SessionExpiredException error = sessionExpired(); when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( completedFuture( result ) ); - assertEquals( result, await( retryLogic.retry( workMock ) ) ); + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); verify( workMock, times( 2 ) ).get(); List scheduleDelays = eventExecutor.scheduleDelays(); @@ -261,7 +367,23 @@ public void schedulesRetryOnSessionExpiredException() } @Test - public void schedulesRetryOnTransientException() + public void sleepsOnTransientException() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 23, 1, 0, clock ); + + Supplier workMock = newWorkMock(); + TransientException error = transientException(); + when( workMock.get() ).thenThrow( error ).thenReturn( null ); + + assertNull( logic.retry( workMock ) ); + + verify( workMock, times( 2 ) ).get(); + verify( clock ).sleep( 23 ); + } + + @Test + public void schedulesRetryOnTransientExceptionAsync() { String result = "The Result"; Clock clock = mock( Clock.class ); @@ -272,7 +394,7 @@ public void schedulesRetryOnTransientException() TransientException error = transientException(); when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( completedFuture( result ) ); - assertEquals( result, await( retryLogic.retry( workMock ) ) ); + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); verify( workMock, times( 2 ) ).get(); List scheduleDelays = eventExecutor.scheduleDelays(); @@ -281,7 +403,31 @@ public void schedulesRetryOnTransientException() } @Test - public void doesNotRetryOnUnknownError() + public void throwsWhenUnknownError() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 1, 1, 1, clock ); + + Supplier workMock = newWorkMock(); + IllegalStateException error = new IllegalStateException(); + when( workMock.get() ).thenThrow( error ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + verify( clock, never() ).sleep( anyLong() ); + } + + @Test + public void doesNotRetryOnUnknownErrorAsync() { Clock clock = mock( Clock.class ); @@ -293,7 +439,7 @@ public void doesNotRetryOnUnknownError() try { - await( retryLogic.retry( workMock ) ); + await( retryLogic.retryAsync( workMock ) ); fail( "Exception expected" ); } catch ( Exception e ) @@ -306,7 +452,31 @@ public void doesNotRetryOnUnknownError() } @Test - public void doesNotRetryOnTransactionTerminatedError() + public void throwsWhenTransactionTerminatedError() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 13, 1, 0, clock ); + + Supplier workMock = newWorkMock(); + TransientException error = new TransientException( "Neo.TransientError.Transaction.Terminated", "" ); + when( workMock.get() ).thenThrow( error ).thenReturn( null ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + verify( clock, never() ).sleep( 13 ); + } + + @Test + public void doesNotRetryOnTransactionTerminatedErrorAsync() { Clock clock = mock( Clock.class ); @@ -318,7 +488,7 @@ public void doesNotRetryOnTransactionTerminatedError() try { - await( retryLogic.retry( workMock ) ); + await( retryLogic.retryAsync( workMock ) ); fail( "Exception expected" ); } catch ( Exception e ) @@ -331,7 +501,31 @@ public void doesNotRetryOnTransactionTerminatedError() } @Test - public void doesNotRetryOnTransactionLockClientStoppedError() + public void throwsWhenTransactionLockClientStoppedError() throws Exception + { + Clock clock = mock( Clock.class ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 13, 1, 0, clock ); + + Supplier workMock = newWorkMock(); + TransientException error = new TransientException( "Neo.TransientError.Transaction.LockClientStopped", "" ); + when( workMock.get() ).thenThrow( error ).thenReturn( null ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + verify( clock, never() ).sleep( 13 ); + } + + @Test + public void doesNotRetryOnTransactionLockClientStoppedErrorAsync() { Clock clock = mock( Clock.class ); @@ -343,7 +537,7 @@ public void doesNotRetryOnTransactionLockClientStoppedError() try { - await( retryLogic.retry( workMock ) ); + await( retryLogic.retryAsync( workMock ) ); fail( "Exception expected" ); } catch ( Exception e ) @@ -356,7 +550,74 @@ public void doesNotRetryOnTransactionLockClientStoppedError() } @Test - public void collectsSuppressedErrors() + public void throwsWhenSleepInterrupted() throws Exception + { + Clock clock = mock( Clock.class ); + doThrow( new InterruptedException() ).when( clock ).sleep( 1 ); + ExponentialBackoffRetryLogic logic = newRetryLogic( 1, 1, 1, 0, clock ); + + Supplier workMock = newWorkMock(); + when( workMock.get() ).thenThrow( serviceUnavailable() ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalStateException.class ) ); + assertThat( e.getCause(), instanceOf( InterruptedException.class ) ); + } + finally + { + // Clear the interruption flag so all subsequent tests do not see this thread as interrupted + Thread.interrupted(); + } + } + + @Test + public void collectsSuppressedErrors() throws Exception + { + long maxRetryTime = 20; + int initialDelay = 15; + int multiplier = 2; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( 0L ).thenReturn( 10L ).thenReturn( 15L ).thenReturn( 25L ); + ExponentialBackoffRetryLogic logic = newRetryLogic( maxRetryTime, initialDelay, multiplier, 0, clock ); + + Supplier workMock = newWorkMock(); + SessionExpiredException error1 = sessionExpired(); + SessionExpiredException error2 = sessionExpired(); + ServiceUnavailableException error3 = serviceUnavailable(); + TransientException error4 = transientException(); + when( workMock.get() ).thenThrow( error1, error2, error3, error4 ).thenReturn( null ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error4, e ); + Throwable[] suppressed = e.getSuppressed(); + assertEquals( 3, suppressed.length ); + assertEquals( error1, suppressed[0] ); + assertEquals( error2, suppressed[1] ); + assertEquals( error3, suppressed[2] ); + } + + verify( workMock, times( 4 ) ).get(); + + verify( clock, times( 3 ) ).sleep( anyLong() ); + verify( clock ).sleep( initialDelay ); + verify( clock ).sleep( initialDelay * multiplier ); + verify( clock ).sleep( initialDelay * multiplier * multiplier ); + } + + @Test + public void collectsSuppressedErrorsAsync() { String result = "The Result"; long maxRetryTime = 20; @@ -381,7 +642,7 @@ public void collectsSuppressedErrors() try { - Futures.getBlocking( retryLogic.retry( workMock ) ); + Futures.getBlocking( retryLogic.retryAsync( workMock ) ); fail( "Exception expected" ); } catch ( Exception e ) @@ -404,7 +665,39 @@ public void collectsSuppressedErrors() } @Test - public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() + public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() throws Exception + { + long maxRetryTime = 20; + int initialDelay = 15; + int multiplier = 2; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( 0L ).thenReturn( 10L ).thenReturn( 25L ); + ExponentialBackoffRetryLogic logic = newRetryLogic( maxRetryTime, initialDelay, multiplier, 0, clock ); + + Supplier workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenThrow( error ); + + try + { + logic.retry( workMock ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + assertEquals( 0, e.getSuppressed().length ); + } + + verify( workMock, times( 3 ) ).get(); + + verify( clock, times( 2 ) ).sleep( anyLong() ); + verify( clock ).sleep( initialDelay ); + verify( clock ).sleep( initialDelay * multiplier ); + } + + @Test + public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrownAsync() { long maxRetryTime = 20; int initialDelay = 15; @@ -420,7 +713,7 @@ public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() try { - Futures.getBlocking( retryLogic.retry( workMock ) ); + Futures.getBlocking( retryLogic.retryAsync( workMock ) ); fail( "Exception expected" ); } catch ( Exception e ) @@ -439,6 +732,25 @@ public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() @Test public void eachRetryIsLogged() + { + int retries = 9; + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + clock, logging ); + + retry( logic, retries ); + + verify( logger, times( retries ) ).warn( + startsWith( "Transaction failed and will be retried" ), + any( ServiceUnavailableException.class ) + ); + } + + @Test + public void eachRetryIsLoggedAsync() { String result = "The Result"; int retries = 9; @@ -450,18 +762,169 @@ public void eachRetryIsLogged() ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, clock, logging ); - assertEquals( result, await( retry( logic, retries, result ) ) ); + assertEquals( result, await( retryAsync( logic, retries, result ) ) ); verify( logger, times( retries ) ).warn( - startsWith( "Transaction failed and is scheduled to retry" ), + startsWith( "Async transaction failed and is scheduled to retry" ), any( ServiceUnavailableException.class ) ); } - private CompletionStage retry( ExponentialBackoffRetryLogic retryLogic, final int times, - final Object result ) + @Test + public void nothingIsLoggedOnFatalFailure() { - return retryLogic.retry( new Supplier>() + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + mock( Clock.class ), logging ); + + try + { + logic.retry( () -> + { + throw new RuntimeException( "Fatal blocking" ); + } ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( "Fatal blocking", e.getMessage() ); + } + verifyZeroInteractions( logger ); + } + + @Test + public void nothingIsLoggedOnFatalFailureAsync() + { + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + mock( Clock.class ), logging ); + + try + { + getBlocking( logic.retryAsync( () -> failedFuture( new RuntimeException( "Fatal async" ) ) ) ); + fail( "Exception expected" ); + } + catch ( RuntimeException e ) + { + assertEquals( "Fatal async", e.getMessage() ); + } + verifyZeroInteractions( logger ); + } + + @Test + public void correctNumberOfRetiesAreLoggedOnFailure() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + RetrySettings settings = RetrySettings.DEFAULT; + RetryLogic logic = new ExponentialBackoffRetryLogic( settings, eventExecutor, clock, logging ); + + try + { + logic.retry( new Supplier() + { + boolean invoked; + + @Override + public Long get() + { + // work that always fails and moves clock forward after the first failure + if ( invoked ) + { + // move clock forward to stop retries + when( clock.millis() ).thenReturn( settings.maxRetryTimeMs() + 42 ); + } + else + { + invoked = true; + } + throw new ServiceUnavailableException( "Error" ); + } + } ); + fail( "Exception expected" ); + } + catch ( ServiceUnavailableException e ) + { + assertEquals( "Error", e.getMessage() ); + } + verify( logger ).warn( + startsWith( "Transaction failed and will be retried" ), + any( ServiceUnavailableException.class ) + ); + } + + @Test + public void correctNumberOfRetiesAreLoggedOnFailureAsync() + { + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + RetrySettings settings = RetrySettings.DEFAULT; + RetryLogic logic = new ExponentialBackoffRetryLogic( settings, eventExecutor, clock, logging ); + + try + { + getBlocking( logic.retryAsync( new Supplier>() + { + volatile boolean invoked; + + @Override + public CompletionStage get() + { + // work that always returns failed future and moves clock forward after the first failure + if ( invoked ) + { + // move clock forward to stop retries + when( clock.millis() ).thenReturn( settings.maxRetryTimeMs() + 42 ); + } + else + { + invoked = true; + } + return failedFuture( new SessionExpiredException( "Session no longer valid" ) ); + } + } ) ); + fail( "Exception expected" ); + } + catch ( SessionExpiredException e ) + { + assertEquals( "Session no longer valid", e.getMessage() ); + } + verify( logger ).warn( + startsWith( "Async transaction failed and is scheduled to retry" ), + any( SessionExpiredException.class ) + ); + } + + private static void retry( ExponentialBackoffRetryLogic retryLogic, final int times ) + { + retryLogic.retry( new Supplier() + { + int invoked; + + @Override + public Void get() + { + if ( invoked < times ) + { + invoked++; + throw serviceUnavailable(); + } + return null; + } + } ); + } + + private CompletionStage retryAsync( ExponentialBackoffRetryLogic retryLogic, int times, Object result ) + { + return retryLogic.retryAsync( new Supplier>() { int invoked; @@ -491,6 +954,13 @@ private static List delaysWithoutJitter( long initialDelay, double multipl return values; } + private static List sleepValues( Clock clockMock, int expectedCount ) throws InterruptedException + { + ArgumentCaptor captor = ArgumentCaptor.forClass( long.class ); + verify( clockMock, times( expectedCount ) ).sleep( captor.capture() ); + return captor.getAllValues(); + } + private ExponentialBackoffRetryLogic newRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, double multiplier, double jitterFactor, Clock clock ) { diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index b24abd6542..d7f44729ba 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -59,6 +59,7 @@ import static java.util.Collections.emptyIterator; import static java.util.Collections.emptyMap; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -71,6 +72,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import static org.neo4j.driver.internal.util.Futures.failedFuture; import static org.neo4j.driver.internal.util.Futures.getBlocking; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.internal.util.Matchers.arithmeticError; @@ -720,6 +722,116 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection() assertNull( getBlocking( tx.rollbackAsync() ) ); } + @Test + public void shouldExecuteReadTransactionUntilSuccessWhenWorkThrows() + { + int maxFailures = 1; + + CompletionStage result = session.readTransactionAsync( new TransactionWork>() + { + final AtomicInteger failures = new AtomicInteger(); + + @Override + public CompletionStage execute( Transaction tx ) + { + if ( failures.getAndIncrement() < maxFailures ) + { + throw new SessionExpiredException( "Oh!" ); + } + return tx.runAsync( "UNWIND range(1, 10) AS x RETURN count(x)" ) + .thenCompose( StatementResultCursor::singleAsync ) + .thenApply( record -> record.get( 0 ).asInt() ); + } + } ); + + assertEquals( 10, getBlocking( result ).intValue() ); + } + + @Test + public void shouldExecuteWriteTransactionUntilSuccessWhenWorkThrows() + { + int maxFailures = 2; + + CompletionStage result = session.writeTransactionAsync( new TransactionWork>() + { + final AtomicInteger failures = new AtomicInteger(); + + @Override + public CompletionStage execute( Transaction tx ) + { + if ( failures.getAndIncrement() < maxFailures ) + { + throw new ServiceUnavailableException( "Oh!" ); + } + return tx.runAsync( "CREATE (n1:TestNode), (n2:TestNode) RETURN 2" ) + .thenCompose( StatementResultCursor::singleAsync ) + .thenApply( record -> record.get( 0 ).asInt() ); + } + } ); + + assertEquals( 2, getBlocking( result ).intValue() ); + assertEquals( 2, countNodesByLabel( "TestNode" ) ); + } + + @Test + public void shouldExecuteReadTransactionUntilSuccessWhenWorkFails() + { + int maxFailures = 3; + + CompletionStage result = session.readTransactionAsync( new TransactionWork>() + { + final AtomicInteger failures = new AtomicInteger(); + + @Override + public CompletionStage execute( Transaction tx ) + { + return tx.runAsync( "RETURN 42" ) + .thenCompose( StatementResultCursor::singleAsync ) + .thenApply( record -> record.get( 0 ).asInt() ) + .thenCompose( result -> + { + if ( failures.getAndIncrement() < maxFailures ) + { + return failedFuture( new TransientException( "A", "B" ) ); + } + return completedFuture( result ); + } ); + } + } ); + + assertEquals( 42, getBlocking( result ).intValue() ); + } + + @Test + public void shouldExecuteWriteTransactionUntilSuccessWhenWorkFails() + { + int maxFailures = 2; + + CompletionStage result = session.writeTransactionAsync( new TransactionWork>() + { + final AtomicInteger failures = new AtomicInteger(); + + @Override + public CompletionStage execute( Transaction tx ) + { + return tx.runAsync( "CREATE (:MyNode) RETURN 'Hello'" ) + .thenCompose( StatementResultCursor::singleAsync ) + .thenApply( record -> record.get( 0 ).asString() ) + .thenCompose( result -> + { + if ( failures.getAndIncrement() < maxFailures ) + { + return failedFuture( new ServiceUnavailableException( "Hi" ) ); + } + return completedFuture( result ); + } ); + } + } ); + + assertEquals( "Hello", getBlocking( result ) ); + assertEquals( 1, countNodesByLabel( "MyNode" ) ); + } + private Future>> runNestedQueries( StatementResultCursor inputCursor ) { CompletableFuture>> resultFuture = new CompletableFuture<>(); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 2b17d2417e..20a8d71d42 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -78,6 +78,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1203,6 +1204,35 @@ public Void execute( Transaction tx ) } } + @Test + public void shouldExecuteTransactionWorkInCallerThread() + { + int maxFailures = 3; + Thread callerThread = Thread.currentThread(); + + try ( Driver driver = newDriver(); + Session session = driver.session() ) + { + String result = session.readTransaction( new TransactionWork() + { + int failures; + + @Override + public String execute( Transaction tx ) + { + assertSame( callerThread, Thread.currentThread() ); + if ( failures++ < maxFailures ) + { + throw new ServiceUnavailableException( "Oh no" ); + } + return "Hello"; + } + } ); + + assertEquals( "Hello", result ); + } + } + private void assumeServerIs31OrLater() { ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );