Skip to content

Commit e818e55

Browse files
author
Zhen Li
committed
Ensure error is propagated from rx queries correctly.
The rx queries should be semantically the same as the async queries. Fixed a few wrong handling of finish future and error propergation used in rx queries.
1 parent 546cef1 commit e818e55

File tree

6 files changed

+23
-20
lines changed

6 files changed

+23
-20
lines changed

driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627

2728
import org.neo4j.driver.AccessMode;
2829
import org.neo4j.driver.Driver;
@@ -51,8 +52,7 @@ public CompletionStage<Void> execute( C context )
5152
RxSession::close )
5253
.subscribe( record -> {
5354
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54-
queryFinished.complete( null );
55-
}, error -> {
55+
}, error -> {
5656
Throwable cause = Futures.completionExceptionCause( error );
5757
assertThat( cause, is( arithmeticError() ) );
5858
queryFinished.complete( null );

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public CompletionStage<Void> execute( C context )
5252
RxTransaction::commit, ( tx, error ) -> tx.rollback(), null )
5353
.subscribe( record -> {
5454
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
55-
queryFinished.complete( null );
5655
}, error -> {
5756
Throwable cause = Futures.completionExceptionCause( error );
5857
assertThat( cause, is( arithmeticError() ) );

driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public CompletionStage<Void> execute( C context )
5151
RxSession::close )
5252
.subscribe( record -> {
5353
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54-
queryFinished.complete( null );
5554
}, error -> {
5655
Throwable cause = Futures.completionExceptionCause( error );
5756
assertThat( cause, is( arithmeticError() ) );

driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,23 @@ public CompletionStage<Void> execute( C context )
4848
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ),
4949
session -> session.run( "CREATE ()" ).consume(), RxSession::close )
5050
.subscribe( summary -> {
51-
queryFinished.complete( null );
5251
assertEquals( 1, summary.counters().nodesCreated() );
5352
context.nodeCreated();
54-
}, error -> {
5553
queryFinished.complete( null );
56-
handleError( Futures.completionExceptionCause( error ), context );
57-
} );
54+
}, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) );
5855

5956
return queryFinished;
6057
}
6158

62-
private void handleError( Throwable error, C context )
59+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6360
{
6461
if ( !stressTest.handleWriteFailure( error, context ) )
6562
{
66-
throw new RuntimeException( error );
63+
queryFinished.completeExceptionally( error );
64+
}
65+
else
66+
{
67+
queryFinished.complete( null );
6768
}
6869
}
6970
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,21 @@ public CompletionStage<Void> execute( C context )
5353
context.nodeCreated();
5454
queryFinished.complete( null );
5555
}, error -> {
56-
handleError( Futures.completionExceptionCause( error ), context );
57-
queryFinished.complete( null );
56+
handleError( Futures.completionExceptionCause( error ), context, queryFinished );
5857
} );
5958

6059
return queryFinished;
6160
}
6261

63-
private void handleError( Throwable error, C context )
62+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6463
{
6564
if ( !stressTest.handleWriteFailure( error, context ) )
6665
{
67-
throw new RuntimeException( error );
66+
queryFinished.completeExceptionally( error );
67+
}
68+
else
69+
{
70+
queryFinished.complete( null );
6871
}
6972
}
7073
}

driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,23 @@ public CompletionStage<Void> execute( C context )
4848
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.READ, context ) ),
4949
session -> session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ), RxSession::close )
5050
.subscribe( summary -> {
51-
queryFinished.complete( null );
5251
assertEquals( 1, summary.counters().nodesCreated() );
5352
context.nodeCreated();
54-
}, error -> {
5553
queryFinished.complete( null );
56-
handleError( Futures.completionExceptionCause( error ), context );
57-
} );
54+
}, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) );
5855

5956
return queryFinished;
6057
}
6158

62-
private void handleError( Throwable error, C context )
59+
private void handleError( Throwable error, C context, CompletableFuture<Void> queryFinished )
6360
{
6461
if ( !stressTest.handleWriteFailure( error, context ) )
6562
{
66-
throw new RuntimeException( error );
63+
queryFinished.completeExceptionally( error );
64+
}
65+
else
66+
{
67+
queryFinished.complete( null );
6768
}
6869
}
6970
}

0 commit comments

Comments
 (0)