Skip to content

Commit 62cd397

Browse files
committed
Better handle sync failures in async tx functions
Given work might fail by throwing exception or by returning a failed future. This commit makes retries gracefully handle exceptions. Previously retrying code would only log an exception and hang.
1 parent be6225d commit 62cd397

File tree

3 files changed

+90
-21
lines changed

3 files changed

+90
-21
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ else if ( future.isSuccess() )
453453
private <T> void executeWork( final InternalPromise<T> resultPromise, final ExplicitTransaction tx,
454454
TransactionWork<Response<T>> work )
455455
{
456-
Response<T> workResponse = work.execute( tx );
456+
Response<T> workResponse = safeExecuteWork( tx, work );
457457
workResponse.addListener( new ResponseListener<T>()
458458
{
459459
@Override
@@ -471,6 +471,22 @@ public void operationCompleted( T result, Throwable error )
471471
} );
472472
}
473473

474+
private <T> Response<T> safeExecuteWork( ExplicitTransaction tx, TransactionWork<Response<T>> work )
475+
{
476+
// given work might fail in both async and sync way
477+
// async failure will result in a failed future being returned
478+
// sync failure will result in an exception being thrown
479+
try
480+
{
481+
return work.execute( tx );
482+
}
483+
catch ( Throwable workError )
484+
{
485+
// work threw an exception, wrap it in a future and proceed
486+
return new InternalPromise<T>( eventExecutorGroup ).setFailure( workError );
487+
}
488+
}
489+
474490
private <T> void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx,
475491
final InternalPromise<T> resultPromise, final Throwable error )
476492
{

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.Arrays;
31-
import java.util.Collections;
3231
import java.util.Iterator;
3332
import java.util.List;
3433
import java.util.concurrent.Future;
@@ -46,6 +45,7 @@
4645
import org.neo4j.driver.v1.TransactionWork;
4746
import org.neo4j.driver.v1.Value;
4847
import org.neo4j.driver.v1.exceptions.ClientException;
48+
import org.neo4j.driver.v1.exceptions.DatabaseException;
4949
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
5050
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
5151
import org.neo4j.driver.v1.exceptions.TransientException;
@@ -54,6 +54,7 @@
5454
import org.neo4j.driver.v1.types.Node;
5555
import org.neo4j.driver.v1.util.TestNeo4j;
5656

57+
import static java.util.Collections.emptyIterator;
5758
import static org.hamcrest.Matchers.containsString;
5859
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5960
import static org.hamcrest.Matchers.instanceOf;
@@ -371,11 +372,13 @@ public void shouldRunAsyncTransactionWithoutRetries()
371372
}
372373

373374
@Test
374-
public void shouldRunAsyncTransactionWithRetries()
375+
public void shouldRunAsyncTransactionWithRetriesOnAsyncFailures()
375376
{
376-
List<Throwable> failures = Arrays.<Throwable>asList( new ServiceUnavailableException( "Oh!" ),
377-
new SessionExpiredException( "Ah!" ), new TransientException( "Code", "Message" ) );
378-
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24", failures );
377+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24" ).withAsyncFailures(
378+
new ServiceUnavailableException( "Oh!" ),
379+
new SessionExpiredException( "Ah!" ),
380+
new TransientException( "Code", "Message" ) );
381+
379382
Response<Record> txResponse = session.writeTransactionAsync( work );
380383

381384
Record record = await( txResponse );
@@ -386,6 +389,23 @@ public void shouldRunAsyncTransactionWithRetries()
386389
assertEquals( 1, countNodesByLabel( "Node" ) );
387390
}
388391

392+
@Test
393+
public void shouldRunAsyncTransactionWithRetriesOnSyncFailures()
394+
{
395+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Test) RETURN 12" ).withSyncFailures(
396+
new TransientException( "Oh!", "Deadlock!" ),
397+
new ServiceUnavailableException( "Oh! Network Failure" ) );
398+
399+
Response<Record> txResponse = session.writeTransactionAsync( work );
400+
401+
Record record = await( txResponse );
402+
assertNotNull( record );
403+
assertEquals( 12L, record.get( 0 ).asLong() );
404+
405+
assertEquals( 3, work.invocationCount() );
406+
assertEquals( 1, countNodesByLabel( "Test" ) );
407+
}
408+
389409
@Test
390410
public void shouldRunAsyncTransactionThatCanNotBeRetried()
391411
{
@@ -406,6 +426,32 @@ public void shouldRunAsyncTransactionThatCanNotBeRetried()
406426
assertEquals( 0, countNodesByLabel( "Hi" ) );
407427
}
408428

429+
@Test
430+
public void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure()
431+
{
432+
// first throw TransientException directly from work, retry can happen afterwards
433+
// then return a future failed with DatabaseException, retry can't happen afterwards
434+
InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Person) RETURN 1" )
435+
.withSyncFailures( new TransientException( "Oh!", "Deadlock!" ) )
436+
.withAsyncFailures( new DatabaseException( "Oh!", "OutOfMemory!" ) );
437+
Response<Record> txResponse = session.writeTransactionAsync( work );
438+
439+
try
440+
{
441+
await( txResponse );
442+
fail( "Exception expected" );
443+
}
444+
catch ( Exception e )
445+
{
446+
assertThat( e, instanceOf( DatabaseException.class ) );
447+
assertEquals( 1, e.getSuppressed().length );
448+
assertThat( e.getSuppressed()[0], instanceOf( TransientException.class ) );
449+
}
450+
451+
assertEquals( 2, work.invocationCount() );
452+
assertEquals( 0, countNodesByLabel( "Person" ) );
453+
}
454+
409455
private Future<List<Future<Boolean>>> runNestedQueries( StatementResultCursor inputCursor )
410456
{
411457
Promise<List<Future<Boolean>>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise();
@@ -524,19 +570,27 @@ void killDb()
524570
private static class InvocationTrackingWork implements TransactionWork<Response<Record>>
525571
{
526572
final String query;
527-
final Iterator<Throwable> failures;
528573
final AtomicInteger invocationCount;
529574

575+
Iterator<RuntimeException> asyncFailures = emptyIterator();
576+
Iterator<RuntimeException> syncFailures = emptyIterator();
577+
530578
InvocationTrackingWork( String query )
531579
{
532-
this( query, Collections.<Throwable>emptyList() );
580+
this.query = query;
581+
this.invocationCount = new AtomicInteger();
582+
}
583+
584+
InvocationTrackingWork withAsyncFailures( RuntimeException... failures )
585+
{
586+
asyncFailures = Arrays.asList( failures ).iterator();
587+
return this;
533588
}
534589

535-
InvocationTrackingWork( String query, List<Throwable> failures )
590+
InvocationTrackingWork withSyncFailures( RuntimeException... failures )
536591
{
537-
this.query = query;
538-
this.failures = failures.iterator();
539-
this.invocationCount = new AtomicInteger();
592+
syncFailures = Arrays.asList( failures ).iterator();
593+
return this;
540594
}
541595

542596
int invocationCount()
@@ -549,6 +603,11 @@ public Response<Record> execute( Transaction tx )
549603
{
550604
invocationCount.incrementAndGet();
551605

606+
if ( syncFailures.hasNext() )
607+
{
608+
throw syncFailures.next();
609+
}
610+
552611
final InternalPromise<Record> resultPromise = new InternalPromise<>( GlobalEventExecutor.INSTANCE );
553612

554613
tx.runAsync( query ).addListener( new ResponseListener<StatementResultCursor>()
@@ -597,9 +656,9 @@ private void processFetchResult( Boolean recordAvailable, Throwable error,
597656
return;
598657
}
599658

600-
if ( failures.hasNext() )
659+
if ( asyncFailures.hasNext() )
601660
{
602-
resultPromise.setFailure( failures.next() );
661+
resultPromise.setFailure( asyncFailures.next() );
603662
}
604663
else
605664
{

driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,7 @@ public static <T, F extends Future<T>> T await( F future )
6868
}
6969
catch ( ExecutionException e )
7070
{
71-
Throwable cause = e.getCause();
72-
StackTraceElement[] originalStackTrace = cause.getStackTrace();
73-
RuntimeException exceptionWithOriginalStackTrace = new RuntimeException();
74-
cause.setStackTrace( exceptionWithOriginalStackTrace.getStackTrace() );
75-
exceptionWithOriginalStackTrace.setStackTrace( originalStackTrace );
76-
cause.addSuppressed( exceptionWithOriginalStackTrace );
77-
throwException( cause );
71+
throwException( e.getCause() );
7872
return null;
7973
}
8074
catch ( TimeoutException e )

0 commit comments

Comments
 (0)