Skip to content

Commit 438de66

Browse files
author
Zhen Li
committed
Ensure error is propagated from rx queries correctly.
The rx queries should be semantically the same as the async queries. Fixed a few wrong handling of finish future and error propergation used in rx queries.
1 parent 38c3738 commit 438de66

File tree

6 files changed

+23
-20
lines changed

6 files changed

+23
-20
lines changed

driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627

2728
import org.neo4j.driver.AccessMode;
2829
import org.neo4j.driver.Driver;
@@ -51,8 +52,7 @@ public CompletionStage<Void> execute( C context )
5152
RxSession::close )
5253
.subscribe( record -> {
5354
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54-
queryFinished.complete( null );
55-
}, error -> {
55+
}, error -> {
5656
Throwable cause = Futures.completionExceptionCause( error );
5757
assertThat( cause, is( arithmeticError() ) );
5858
queryFinished.complete( null );

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public CompletionStage<Void> execute( C context )
5252
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null )
5353
.subscribe( record -> {
5454
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
55-
queryFinished.complete( null );
5655
}, error -> {
5756
Throwable cause = Futures.completionExceptionCause( error );
5857
assertThat( cause, is( arithmeticError() ) );

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public CompletionStage<Void> execute( C context )
5151
RxSession::close )
5252
.subscribe( record -> {
5353
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54-
queryFinished.complete( null );
5554
}, error -> {
5655
Throwable cause = Futures.completionExceptionCause( error );
5756
assertThat( cause, is( arithmeticError() ) );

driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,23 @@ public CompletionStage<Void> execute( C context )
4949
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ),
5050
session -> session.run( "CREATE ()" ).consume(), RxSession::close )
5151
.subscribe( summary -> {
52-
queryFinished.complete( null );
5352
assertEquals( 1, summary.counters().nodesCreated() );
5453
context.nodeCreated();
55-
}, error -> {
5654
queryFinished.complete( null );
57-
handleError( Futures.completionExceptionCause( error ), context );
58-
} );
55+
}, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) );
5956

6057
return queryFinished;
6158
}
6259

63-
private void handleError( Throwable error, C context )
60+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6461
{
6562
if ( !stressTest.handleWriteFailure( error, context ) )
6663
{
67-
throw new RuntimeException( error );
64+
queryFinished.completeExceptionally( error );
65+
}
66+
else
67+
{
68+
queryFinished.complete( null );
6869
}
6970
}
7071
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,21 @@ public CompletionStage<Void> execute( C context )
5353
context.nodeCreated();
5454
queryFinished.complete( null );
5555
}, error -> {
56-
handleError( Futures.completionExceptionCause( error ), context );
57-
queryFinished.complete( null );
56+
handleError( Futures.completionExceptionCause( error ), context, queryFinished );
5857
} );
5958

6059
return queryFinished;
6160
}
6261

63-
private void handleError( Throwable error, C context )
62+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6463
{
6564
if ( !stressTest.handleWriteFailure( error, context ) )
6665
{
67-
throw new RuntimeException( error );
66+
queryFinished.completeExceptionally( error );
67+
}
68+
else
69+
{
70+
queryFinished.complete( null );
6871
}
6972
}
7073
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,23 @@ public CompletionStage<Void> execute( C context )
5050
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ),
5151
session -> session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ), RxSession::close )
5252
.subscribe( summary -> {
53-
queryFinished.complete( null );
5453
assertEquals( 1, summary.counters().nodesCreated() );
5554
context.nodeCreated();
56-
}, error -> {
5755
queryFinished.complete( null );
58-
handleError( Futures.completionExceptionCause( error ), context );
59-
} );
56+
}, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) );
6057

6158
return queryFinished;
6259
}
6360

64-
private void handleError( Throwable error, C context )
61+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6562
{
6663
if ( !stressTest.handleWriteFailure( error, context ) )
6764
{
68-
throw new RuntimeException( error );
65+
queryFinished.completeExceptionally( error );
66+
}
67+
else
68+
{
69+
queryFinished.complete( null );
6970
}
7071
}
7172
}

0 commit comments

Comments
 (0)