From 438de66b2d93fb2520688ca2a2d938591fd158d4 Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Mon, 24 Feb 2020 16:34:12 +0100 Subject: [PATCH 1/3] Ensure error is propagated from rx queries correctly. The rx queries should be semantically the same as the async queries. Fixed a few wrong handling of finish future and error propergation used in rx queries. --- .../org/neo4j/driver/stress/RxFailingQuery.java | 4 ++-- .../org/neo4j/driver/stress/RxFailingQueryInTx.java | 1 - .../driver/stress/RxFailingQueryWithRetries.java | 1 - .../java/org/neo4j/driver/stress/RxWriteQuery.java | 13 +++++++------ .../org/neo4j/driver/stress/RxWriteQueryInTx.java | 11 +++++++---- .../driver/stress/RxWriteQueryWithRetries.java | 13 +++++++------ 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java b/driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java index d0289e2022..1fe2969a4f 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxFailingQuery.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; @@ -51,8 +52,7 @@ public CompletionStage execute( C context ) RxSession::close ) .subscribe( record -> { assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) ); - queryFinished.complete( null ); - }, error -> { + }, error -> { Throwable cause = Futures.completionExceptionCause( error ); assertThat( cause, is( arithmeticError() ) ); queryFinished.complete( null ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java b/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java index a85962ca29..dae34a8516 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryInTx.java @@ -52,7 +52,6 @@ public CompletionStage execute( C context ) RxTransaction::commit, ( tx, error ) -> tx.rollback(), null ) .subscribe( record -> { assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) ); - queryFinished.complete( null ); }, error -> { Throwable cause = Futures.completionExceptionCause( error ); assertThat( cause, is( arithmeticError() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java index 4bb3db3e8b..71e5c58238 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxFailingQueryWithRetries.java @@ -51,7 +51,6 @@ public CompletionStage execute( C context ) RxSession::close ) .subscribe( record -> { assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) ); - queryFinished.complete( null ); }, error -> { Throwable cause = Futures.completionExceptionCause( error ); assertThat( cause, is( arithmeticError() ) ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java index f0f71be6e6..3feaa73525 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java @@ -49,22 +49,23 @@ public CompletionStage execute( C context ) Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ), session -> session.run( "CREATE ()" ).consume(), RxSession::close ) .subscribe( summary -> { - queryFinished.complete( null ); assertEquals( 1, summary.counters().nodesCreated() ); context.nodeCreated(); - }, error -> { queryFinished.complete( null ); - handleError( Futures.completionExceptionCause( error ), context ); - } ); + }, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) ); return queryFinished; } - private void handleError( Throwable error, C context ) + private void handleError( Throwable error, C context, CompletableFuture queryFinished ) { if ( !stressTest.handleWriteFailure( error, context ) ) { - throw new RuntimeException( error ); + queryFinished.completeExceptionally( error ); + } + else + { + queryFinished.complete( null ); } } } diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java index d71c07f49f..69596e0e51 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java @@ -53,18 +53,21 @@ public CompletionStage execute( C context ) context.nodeCreated(); queryFinished.complete( null ); }, error -> { - handleError( Futures.completionExceptionCause( error ), context ); - queryFinished.complete( null ); + handleError( Futures.completionExceptionCause( error ), context, queryFinished ); } ); return queryFinished; } - private void handleError( Throwable error, C context ) + private void handleError( Throwable error, C context, CompletableFuture queryFinished ) { if ( !stressTest.handleWriteFailure( error, context ) ) { - throw new RuntimeException( error ); + queryFinished.completeExceptionally( error ); + } + else + { + queryFinished.complete( null ); } } } diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java index 4001f95b56..ef209988d6 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java @@ -50,22 +50,23 @@ public CompletionStage execute( C context ) Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ), session -> session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ), RxSession::close ) .subscribe( summary -> { - queryFinished.complete( null ); assertEquals( 1, summary.counters().nodesCreated() ); context.nodeCreated(); - }, error -> { queryFinished.complete( null ); - handleError( Futures.completionExceptionCause( error ), context ); - } ); + }, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) ); return queryFinished; } - private void handleError( Throwable error, C context ) + private void handleError( Throwable error, C context, CompletableFuture queryFinished ) { if ( !stressTest.handleWriteFailure( error, context ) ) { - throw new RuntimeException( error ); + queryFinished.completeExceptionally( error ); + } + else + { + queryFinished.complete( null ); } } } From f9f773bac74d59a4da2e21248254ca0aedf7d3e8 Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Tue, 25 Feb 2020 17:29:08 +0100 Subject: [PATCH 2/3] Adding bookmark updates in all write queries However this bookmark is not used for querying nodes count. It is only used to randomly generate queries waiting or not waiting for bookmark. --- .../java/org/neo4j/driver/stress/AsyncWriteQuery.java | 1 + .../org/neo4j/driver/stress/AsyncWriteQueryInTx.java | 6 ++++-- .../org/neo4j/driver/stress/BlockingWriteQuery.java | 11 ++++++----- .../java/org/neo4j/driver/stress/RxWriteQuery.java | 4 +++- .../org/neo4j/driver/stress/RxWriteQueryInTx.java | 1 + 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQuery.java b/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQuery.java index 2f93f9b04e..38d05efa33 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQuery.java +++ b/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQuery.java @@ -55,6 +55,7 @@ public CompletionStage execute( C context ) } else { + context.setBookmark( session.lastBookmark() ); assertEquals( 1, summary.counters().nodesCreated() ); context.nodeCreated(); } diff --git a/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryInTx.java b/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryInTx.java index 8f7af41ab2..0a1403a632 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryInTx.java @@ -45,8 +45,10 @@ public CompletionStage execute( C context ) CompletionStage txCommitted = session.beginTransactionAsync().thenCompose( tx -> tx.runAsync( "CREATE ()" ).thenCompose( cursor -> - cursor.consumeAsync().thenCompose( summary -> - tx.commitAsync().thenApply( ignore -> summary ) ) ) ); + cursor.consumeAsync().thenCompose( summary -> tx.commitAsync().thenApply( ignore -> { + context.setBookmark( session.lastBookmark() ); + return summary; + } ) ) ) ); return txCommitted.handle( ( summary, error ) -> { diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQuery.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQuery.java index e63da25c68..5c59de8e65 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQuery.java +++ b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQuery.java @@ -21,7 +21,7 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; import org.neo4j.driver.Session; -import org.neo4j.driver.Result; +import org.neo4j.driver.summary.ResultSummary; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -38,12 +38,13 @@ public BlockingWriteQuery( AbstractStressTestBase stressTest, Driver driver, @Override public void execute( C context ) { - Result result = null; + ResultSummary summary = null; Throwable queryError = null; try ( Session session = newSession( AccessMode.WRITE, context ) ) { - result = session.run( "CREATE ()" ); + summary = session.run( "CREATE ()" ).consume(); + context.setBookmark( session.lastBookmark() ); } catch ( Throwable error ) { @@ -54,9 +55,9 @@ public void execute( C context ) } } - if ( queryError == null && result != null ) + if ( queryError == null && summary != null ) { - assertEquals( 1, result.consume().counters().nodesCreated() ); + assertEquals( 1, summary.counters().nodesCreated() ); context.nodeCreated(); } } diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java index 3feaa73525..3f070dc75c 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQuery.java @@ -47,7 +47,9 @@ public CompletionStage execute( C context ) { CompletableFuture queryFinished = new CompletableFuture<>(); Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ), - session -> session.run( "CREATE ()" ).consume(), RxSession::close ) + session -> Flux.from( session.run( "CREATE ()" ).consume() ) + .doOnComplete( () -> context.setBookmark( session.lastBookmark() ) ), + RxSession::close ) .subscribe( summary -> { assertEquals( 1, summary.counters().nodesCreated() ); context.nodeCreated(); diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java index 69596e0e51..1d2f9752f6 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java @@ -49,6 +49,7 @@ public CompletionStage execute( C context ) Flux.usingWhen( session.beginTransaction(), tx -> tx.run( "CREATE ()" ).consume(), RxTransaction::commit, ( tx, error ) -> tx.rollback(), null ).subscribe( summary -> { + context.setBookmark( session.lastBookmark() ); assertEquals( 1, summary.counters().nodesCreated() ); context.nodeCreated(); queryFinished.complete( null ); From e054ec5d0e8cfe3ee9d134a141f7a2fb79e4294b Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Wed, 8 Apr 2020 19:42:31 +0200 Subject: [PATCH 3/3] Removes time limit on waiting for reactive session run replies. --- .../org/neo4j/driver/stress/AbstractStressTestBase.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java b/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java index d30d30c6e3..6f65045b0f 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java +++ b/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java @@ -30,7 +30,6 @@ import java.lang.management.OperatingSystemMXBean; import java.lang.reflect.Method; import java.net.URI; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -99,7 +98,6 @@ abstract class AbstractStressTestBase private static final int BIG_DATA_TEST_NODE_COUNT = Integer.getInteger( "bigDataTestNodeCount", 30_000 ); private static final int BIG_DATA_TEST_BATCH_SIZE = Integer.getInteger( "bigDataTestBatchSize", 10_000 ); - private static final Duration DEFAULT_BLOCKING_TIME_OUT = Duration.ofMinutes( 10 ); private LoggerNameTrackingLogging logging; private ExecutorService executor; @@ -638,7 +636,7 @@ private Bookmark createNodesRx( int batchCount, int batchSize, InternalDriver dr Flux.concat( Flux.range( 0, batchCount ).map( batchIndex -> session.writeTransaction( tx -> createNodesInTxRx( tx, batchIndex, batchSize ) ) - ) ).blockLast( DEFAULT_BLOCKING_TIME_OUT ); // throw any error if happened + ) ).blockLast(); // throw any error if happened long end = System.nanoTime(); System.out.println( "Node creation with reactive API took: " + NANOSECONDS.toMillis( end - start ) + "ms" ); @@ -673,7 +671,7 @@ private void readNodesRx( InternalDriver driver, Bookmark bookmark, int expected verifyNodeProperties( node ); } ).then() ); - Flux.from( readQuery ).blockLast( DEFAULT_BLOCKING_TIME_OUT ); + Flux.from( readQuery ).blockLast(); assertEquals( expectedNodeCount, nodesSeen.get() );