22
22
import org .junit .jupiter .api .AfterEach ;
23
23
import org .junit .jupiter .api .BeforeEach ;
24
24
import org .junit .jupiter .api .Test ;
25
+ import org .reactivestreams .Publisher ;
26
+ import reactor .core .publisher .Flux ;
25
27
26
28
import java .lang .management .ManagementFactory ;
27
29
import java .lang .management .OperatingSystemMXBean ;
28
30
import java .lang .reflect .Method ;
29
31
import java .net .URI ;
32
+ import java .time .Duration ;
30
33
import java .util .ArrayList ;
31
34
import java .util .Arrays ;
32
35
import java .util .HashMap ;
66
69
import org .neo4j .driver .internal .util .Futures ;
67
70
import org .neo4j .driver .internal .util .Iterables ;
68
71
import org .neo4j .driver .internal .util .Neo4jFeature ;
72
+ import org .neo4j .driver .reactive .RxSession ;
73
+ import org .neo4j .driver .reactive .RxTransaction ;
69
74
import org .neo4j .driver .types .Node ;
70
75
import org .neo4j .driver .util .DaemonThreadFactory ;
71
76
@@ -91,6 +96,7 @@ abstract class AbstractStressTestBase<C extends AbstractContext>
91
96
92
97
private static final int BIG_DATA_TEST_NODE_COUNT = Integer .getInteger ( "bigDataTestNodeCount" , 30_000 );
93
98
private static final int BIG_DATA_TEST_BATCH_SIZE = Integer .getInteger ( "bigDataTestBatchSize" , 10_000 );
99
+ private static final Duration DEFAULT_BLOCKING_TIME_OUT = Duration .ofMinutes ( 5 );
94
100
95
101
private LoggerNameTrackingLogging logging ;
96
102
private ExecutorService executor ;
@@ -159,6 +165,14 @@ void asyncApiBigDataTest() throws Throwable
159
165
readNodesAsync ( driver , bookmark , BIG_DATA_TEST_NODE_COUNT );
160
166
}
161
167
168
+ @ Test
169
+ @ EnabledOnNeo4jWith ( Neo4jFeature .BOLT_V4 )
170
+ void rxApiBigDataTest () throws Throwable
171
+ {
172
+ String bookmark = createNodesRx ( bigDataTestBatchCount (), BIG_DATA_TEST_BATCH_SIZE , driver );
173
+ readNodesRx ( driver , bookmark , BIG_DATA_TEST_NODE_COUNT );
174
+ }
175
+
162
176
private void runStressTest ( Function <C ,List <Future <?>>> threadLauncher ) throws Throwable
163
177
{
164
178
C context = createContext ();
@@ -603,6 +617,57 @@ private static void readNodesAsync( Driver driver, String bookmark, int expected
603
617
System .out .println ( "Reading nodes with async API took: " + NANOSECONDS .toMillis ( end - start ) + "ms" );
604
618
}
605
619
620
+ private String createNodesRx ( int batchCount , int batchSize , InternalDriver driver )
621
+ {
622
+ long start = System .nanoTime ();
623
+
624
+ RxSession session = driver .rxSession ();
625
+
626
+ Flux .concat ( Flux .range ( 0 , batchCount ).map ( batchIndex ->
627
+ session .writeTransaction ( tx -> createNodesInTxRx ( tx , batchIndex , batchSize ) )
628
+ ) ).blockLast ( DEFAULT_BLOCKING_TIME_OUT ); // throw any error if happened
629
+
630
+ long end = System .nanoTime ();
631
+ System .out .println ( "Node creation with reactive API took: " + NANOSECONDS .toMillis ( end - start ) + "ms" );
632
+
633
+ return session .lastBookmark ();
634
+ }
635
+
636
+ private Publisher <Void > createNodesInTxRx ( RxTransaction tx , int batchIndex , int batchSize )
637
+ {
638
+ return Flux .concat ( Flux .range ( 0 , batchSize ).map ( index -> batchIndex * batchSize + index ).map ( nodeIndex -> {
639
+ Statement statement = createNodeInTxStatement ( nodeIndex );
640
+ return Flux .from ( tx .run ( statement ).summary () ).then (); // As long as there is no error
641
+ } ) );
642
+ }
643
+
644
+ private void readNodesRx ( InternalDriver driver , String bookmark , int expectedNodeCount )
645
+ {
646
+ long start = System .nanoTime ();
647
+
648
+ RxSession session = driver .rxSession ( t -> t .withBookmarks ( bookmark ) );
649
+ AtomicInteger nodesSeen = new AtomicInteger ();
650
+
651
+ Publisher <Void > readQuery = session .readTransaction ( tx -> Flux .from ( tx .run ( "MATCH (n:Node) RETURN n" ).records () ).doOnNext ( record -> {
652
+ Node node = record .get ( 0 ).asNode ();
653
+ nodesSeen .incrementAndGet ();
654
+
655
+ List <String > labels = Iterables .asList ( node .labels () );
656
+ assertEquals ( 2 , labels .size () );
657
+ assertTrue ( labels .contains ( "Test" ) );
658
+ assertTrue ( labels .contains ( "Node" ) );
659
+
660
+ verifyNodeProperties ( node );
661
+ } ).then () );
662
+
663
+ Flux .from ( readQuery ).blockLast ( DEFAULT_BLOCKING_TIME_OUT );
664
+
665
+ assertEquals ( expectedNodeCount , nodesSeen .get () );
666
+
667
+ long end = System .nanoTime ();
668
+ System .out .println ( "Reading nodes with async API took: " + NANOSECONDS .toMillis ( end - start ) + "ms" );
669
+ }
670
+
606
671
private static Void createNodesInTx ( Transaction tx , int batchIndex , int batchSize )
607
672
{
608
673
for ( int index = 0 ; index < batchSize ; index ++ )
0 commit comments