18
18
*/
19
19
package org .neo4j .driver .stress ;
20
20
21
+ import org .reactivestreams .Publisher ;
21
22
import reactor .core .publisher .Flux ;
23
+ import reactor .core .publisher .Mono ;
22
24
23
25
import java .util .concurrent .CompletableFuture ;
24
26
import java .util .concurrent .CompletionStage ;
27
+ import java .util .concurrent .atomic .AtomicInteger ;
28
+ import java .util .function .Function ;
25
29
26
- import org .neo4j .driver .AccessMode ;
27
30
import org .neo4j .driver .Driver ;
28
31
import org .neo4j .driver .internal .util .Futures ;
29
32
import org .neo4j .driver .reactive .RxSession ;
30
33
import org .neo4j .driver .reactive .RxTransaction ;
34
+ import org .neo4j .driver .summary .ResultSummary ;
31
35
32
36
import static org .junit .jupiter .api .Assertions .assertEquals ;
33
37
@@ -45,17 +49,32 @@ public RxWriteQueryInTx( AbstractStressTestBase<C> stressTest, Driver driver, bo
45
49
public CompletionStage <Void > execute ( C context )
46
50
{
47
51
CompletableFuture <Void > queryFinished = new CompletableFuture <>();
48
- RxSession session = newSession ( AccessMode .WRITE , context );
49
- Flux .usingWhen ( session .beginTransaction (), tx -> tx .run ( "CREATE ()" ).consume (),
50
- RxTransaction ::commit , ( tx , error ) -> tx .rollback (), null ).subscribe (
51
- summary -> {
52
- context .setBookmark ( session .lastBookmark () );
53
- assertEquals ( 1 , summary .counters ().nodesCreated () );
52
+
53
+ Function <RxSession ,Publisher <ResultSummary >> sessionToResultSummaryPublisher = ( RxSession session ) -> Flux .usingWhen (
54
+ Mono .from ( session .beginTransaction () ),
55
+ tx -> tx .run ( "CREATE ()" ).consume (),
56
+ RxTransaction ::commit ,
57
+ ( tx , error ) -> tx .rollback (),
58
+ RxTransaction ::rollback
59
+ );
60
+
61
+ AtomicInteger createdNodesNum = new AtomicInteger ();
62
+ Flux .usingWhen (
63
+ Mono .fromSupplier ( driver ::rxSession ),
64
+ sessionToResultSummaryPublisher ,
65
+ session -> Mono .empty (),
66
+ ( session , error ) -> session .close (),
67
+ RxSession ::close
68
+ ).subscribe (
69
+ resultSummary -> createdNodesNum .addAndGet ( resultSummary .counters ().nodesCreated () ),
70
+ error -> handleError ( Futures .completionExceptionCause ( error ), context , queryFinished ),
71
+ () ->
72
+ {
73
+ assertEquals ( 1 , createdNodesNum .get () );
54
74
context .nodeCreated ();
55
75
queryFinished .complete ( null );
56
- }, error -> {
57
- handleError ( Futures .completionExceptionCause ( error ), context , queryFinished );
58
- } );
76
+ }
77
+ );
59
78
60
79
return queryFinished ;
61
80
}
0 commit comments