Skip to content

Commit 331ca64

Browse files
committed
Updating stress tests exception and session handling
Exception handling is updated to expose the leader switching exceptions for managed transactions. Async session handling has been updated to close session and report problems if they arise.
1 parent 57eb7ea commit 331ca64

8 files changed

+98
-90
lines changed

driver/src/test/java/org/neo4j/driver/stress/AsyncFailingQueryWithRetries.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
*/
1919
package org.neo4j.driver.stress;
2020

21+
import java.util.List;
2122
import java.util.concurrent.CompletionStage;
2223

2324
import org.neo4j.driver.AccessMode;
2425
import org.neo4j.driver.Driver;
26+
import org.neo4j.driver.Record;
2527
import org.neo4j.driver.async.AsyncSession;
2628
import org.neo4j.driver.async.ResultCursor;
2729
import org.neo4j.driver.internal.util.Futures;
@@ -43,17 +45,19 @@ public CompletionStage<Void> execute( C context )
4345
{
4446
AsyncSession session = newSession( AccessMode.READ, context );
4547

46-
return session.readTransactionAsync( tx -> tx.runAsync( "UNWIND [10, 5, 0] AS x RETURN 10 / x" )
47-
.thenCompose( ResultCursor::listAsync )
48-
.handle( ( records, error ) ->
49-
{
50-
session.closeAsync();
48+
CompletionStage<List<Record>> txStage = session.readTransactionAsync( tx -> tx.runAsync( "UNWIND [10, 5, 0] AS x RETURN 10 / x" )
49+
.thenCompose( ResultCursor::listAsync ) );
5150

52-
assertNull( records );
53-
Throwable cause = Futures.completionExceptionCause( error );
54-
assertThat( cause, is( arithmeticError() ) );
51+
CompletionStage<Void> resultsProcessingStage = txStage
52+
.handle( ( records, error ) ->
53+
{
54+
assertNull( records );
55+
Throwable cause = Futures.completionExceptionCause( error );
56+
assertThat( cause, is( arithmeticError() ) );
5557

56-
return null;
57-
} ));
58+
return null;
59+
} );
60+
61+
return resultsProcessingStage.whenComplete( ( nothing, throwable ) -> session.closeAsync() );
5862
}
5963
}

driver/src/test/java/org/neo4j/driver/stress/AsyncReadQueryWithRetries.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,19 @@ public CompletionStage<Void> execute( C context )
4242
{
4343
AsyncSession session = newSession( AccessMode.READ, context );
4444

45-
CompletionStage<ResultSummary> queryFinished = session.readTransactionAsync(
45+
CompletionStage<ResultSummary> txStage = session.readTransactionAsync(
4646
tx -> tx.runAsync( "MATCH (n) RETURN n LIMIT 1" )
4747
.thenCompose(
4848
cursor -> cursor.nextAsync()
49-
.thenCompose( record -> processAndGetSummary( record, cursor ) ) ) );
49+
.thenCompose(
50+
record -> processRecordAndGetSummary( record, cursor ) ) ) );
5051

51-
queryFinished.whenComplete( ( summary, error ) ->
52-
{
53-
if ( summary != null )
54-
{
55-
context.readCompleted( summary );
56-
}
57-
session.closeAsync();
58-
} );
52+
CompletionStage<Void> resultsProcessingStage = txStage.thenApply( resultSummary -> processResultSummary( resultSummary, context ) );
5953

60-
return queryFinished.thenApply( summary -> null );
54+
return resultsProcessingStage.whenComplete( ( nothing, throwable ) -> session.closeAsync() );
6155
}
6256

63-
private CompletionStage<ResultSummary> processAndGetSummary( Record record, ResultCursor cursor )
57+
private CompletionStage<ResultSummary> processRecordAndGetSummary( Record record, ResultCursor cursor )
6458
{
6559
if ( record != null )
6660
{
@@ -69,4 +63,13 @@ private CompletionStage<ResultSummary> processAndGetSummary( Record record, Resu
6963
}
7064
return cursor.consumeAsync();
7165
}
66+
67+
private Void processResultSummary( ResultSummary resultSummary, C context )
68+
{
69+
if ( resultSummary != null )
70+
{
71+
context.readCompleted( resultSummary );
72+
}
73+
return null;
74+
}
7275
}

driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryWithRetries.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.neo4j.driver.Driver;
2525
import org.neo4j.driver.async.AsyncSession;
2626
import org.neo4j.driver.async.ResultCursor;
27-
import org.neo4j.driver.internal.util.Futures;
27+
import org.neo4j.driver.summary.ResultSummary;
2828

2929
import static org.junit.jupiter.api.Assertions.assertEquals;
3030

@@ -43,33 +43,34 @@ public CompletionStage<Void> execute( C context )
4343
{
4444
AsyncSession session = newSession( AccessMode.WRITE, context );
4545

46-
return session.writeTransactionAsync(
47-
tx -> tx.runAsync( "CREATE ()" )
48-
.thenCompose( ResultCursor::consumeAsync ) )
49-
.handle( ( summary, error ) ->
50-
{
51-
session.closeAsync();
46+
CompletionStage<ResultSummary> txStage = session.writeTransactionAsync(
47+
tx -> tx.runAsync( "CREATE ()" ).thenCompose( ResultCursor::consumeAsync ) );
5248

53-
if ( error != null )
54-
{
55-
handleError( Futures.completionExceptionCause( error ), context );
56-
}
57-
else
58-
{
59-
context.setBookmark( session.lastBookmark() );
60-
assertEquals( 1, summary.counters().nodesCreated() );
61-
context.nodeCreated();
62-
}
49+
return txStage.thenApply( resultSummary -> processResultSummary( resultSummary, context ) )
50+
.handle( ( nothing, throwable ) -> recordAndRethrowThrowable( throwable, context ) )
51+
.whenComplete( ( nothing, throwable ) -> finalizeSession( session, context ) );
52+
}
6353

64-
return null;
65-
} );
54+
private Void processResultSummary( ResultSummary resultSummary, C context )
55+
{
56+
assertEquals( 1, resultSummary.counters().nodesCreated() );
57+
context.nodeCreated();
58+
return null;
6659
}
6760

68-
private void handleError( Throwable error, C context )
61+
private Void recordAndRethrowThrowable( Throwable throwable, C context )
6962
{
70-
if ( !stressTest.handleWriteFailure( error, context ) )
63+
if ( throwable != null )
7164
{
72-
throw new RuntimeException( error );
65+
stressTest.handleWriteFailure( throwable, context );
66+
throw new RuntimeException( throwable );
7367
}
68+
return null;
69+
}
70+
71+
private void finalizeSession( AsyncSession session, C context )
72+
{
73+
context.setBookmark( session.lastBookmark() );
74+
session.closeAsync();
7475
}
7576
}

driver/src/test/java/org/neo4j/driver/stress/AsyncWrongQueryWithRetries.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919
package org.neo4j.driver.stress;
2020

2121
import java.util.concurrent.CompletionStage;
22+
import java.util.concurrent.atomic.AtomicReference;
2223

2324
import org.neo4j.driver.AccessMode;
2425
import org.neo4j.driver.Driver;
26+
import org.neo4j.driver.Record;
2527
import org.neo4j.driver.async.AsyncSession;
26-
import org.neo4j.driver.async.ResultCursor;
2728
import org.neo4j.driver.exceptions.ClientException;
2829
import org.neo4j.driver.exceptions.Neo4jException;
2930
import org.neo4j.driver.internal.util.Futures;
31+
import org.neo4j.driver.summary.ResultSummary;
3032

3133
import static org.hamcrest.Matchers.containsString;
3234
import static org.hamcrest.Matchers.instanceOf;
@@ -46,19 +48,37 @@ public CompletionStage<Void> execute( C context )
4648
{
4749
AsyncSession session = newSession( AccessMode.READ, context );
4850

49-
return session.readTransactionAsync(tx -> tx.runAsync("RETURN Wrong" )
50-
.thenCompose( ResultCursor::nextAsync )
51-
.handle( ( record, error ) ->
52-
{
53-
session.closeAsync();
54-
assertNull( record );
51+
AtomicReference<Record> recordRef = new AtomicReference<>();
52+
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
5553

56-
Throwable cause = Futures.completionExceptionCause( error );
57-
assertNotNull( cause );
58-
assertThat( cause, instanceOf( ClientException.class ) );
59-
assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) );
54+
CompletionStage<ResultSummary> txStage = session.readTransactionAsync(
55+
tx -> tx.runAsync( "RETURN Wrong" )
56+
.thenCompose(
57+
cursor -> cursor.nextAsync()
58+
.thenCompose(
59+
record ->
60+
{
61+
recordRef.set( record );
62+
return cursor.consumeAsync();
63+
} ) ) );
6064

61-
return null;
62-
} ));
65+
CompletionStage<Void> resultsProcessingStage = txStage
66+
.handle( ( resultSummary, throwable ) ->
67+
{
68+
throwableRef.set( throwable );
69+
return null;
70+
} )
71+
.thenApply( nothing ->
72+
{
73+
assertNull( recordRef.get() );
74+
75+
Throwable cause = Futures.completionExceptionCause( throwableRef.get() );
76+
assertNotNull( cause );
77+
assertThat( cause, instanceOf( ClientException.class ) );
78+
assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) );
79+
return null;
80+
} );
81+
82+
return resultsProcessingStage.whenComplete( ( nothing, throwable ) -> session.closeAsync() );
6383
}
6484
}

driver/src/test/java/org/neo4j/driver/stress/BlockingFailingQueryWithRetries.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,7 @@ public void execute( C context )
4141
{
4242
Exception e = assertThrows(
4343
Exception.class,
44-
() -> session.readTransaction(
45-
tx ->
46-
{
47-
tx.run( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ).consume();
48-
return 1;
49-
} ) );
44+
() -> session.readTransaction( tx -> tx.run( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ).consume() ) );
5045
assertThat( e, is( arithmeticError() ) );
5146
}
5247
}

driver/src/test/java/org/neo4j/driver/stress/BlockingReadQueryWithRetries.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.neo4j.driver.Record;
2626
import org.neo4j.driver.Result;
2727
import org.neo4j.driver.Session;
28+
import org.neo4j.driver.summary.ResultSummary;
2829
import org.neo4j.driver.types.Node;
2930

3031
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -42,7 +43,7 @@ public void execute( C context )
4243
{
4344
try ( Session session = newSession( AccessMode.READ, context ) )
4445
{
45-
session.readTransaction(
46+
ResultSummary resultSummary = session.readTransaction(
4647
tx ->
4748
{
4849
Result result = tx.run( "MATCH (n) RETURN n LIMIT 1" );
@@ -54,9 +55,9 @@ public void execute( C context )
5455
assertNotNull( node );
5556
}
5657

57-
context.readCompleted( result.consume() );
58-
return 1;
58+
return result.consume();
5959
} );
60+
context.readCompleted( resultSummary );
6061
}
6162
}
6263
}

driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryWithRetries.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,27 +38,17 @@ public BlockingWriteQueryWithRetries( AbstractStressTestBase<C> stressTest, Driv
3838
@Override
3939
public void execute( C context )
4040
{
41-
ResultSummary resultSummary = null;
42-
Throwable queryError = null;
43-
4441
try ( Session session = newSession( AccessMode.WRITE, context ) )
4542
{
46-
resultSummary = session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() );
43+
ResultSummary resultSummary = session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() );
44+
assertEquals( 1, resultSummary.counters().nodesCreated() );
45+
context.nodeCreated();
4746
context.setBookmark( session.lastBookmark() );
4847
}
49-
catch ( Throwable error )
50-
{
51-
queryError = error;
52-
if ( !stressTest.handleWriteFailure( error, context ) )
53-
{
54-
throw error;
55-
}
56-
}
57-
58-
if ( queryError == null && resultSummary != null )
48+
catch ( RuntimeException error )
5949
{
60-
assertEquals( 1, resultSummary.counters().nodesCreated() );
61-
context.nodeCreated();
50+
stressTest.handleWriteFailure( error, context );
51+
throw error;
6252
}
6353
}
6454
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,7 @@ public CompletionStage<Void> execute( C context )
6969

7070
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
7171
{
72-
if ( !stressTest.handleWriteFailure( error, context ) )
73-
{
74-
queryFinished.completeExceptionally( error );
75-
}
76-
else
77-
{
78-
queryFinished.complete( null );
79-
}
72+
stressTest.handleWriteFailure( error, context );
73+
queryFinished.completeExceptionally( error );
8074
}
8175
}

0 commit comments

Comments
 (0)