22
22
import org .junit .jupiter .api .AfterEach ;
23
23
import org .junit .jupiter .api .BeforeEach ;
24
24
import org .junit .jupiter .api .Test ;
25
+ import org .reactivestreams .Publisher ;
26
+ import reactor .core .publisher .Flux ;
25
27
26
28
import java .lang .management .ManagementFactory ;
27
29
import java .lang .management .OperatingSystemMXBean ;
28
30
import java .lang .reflect .Method ;
29
31
import java .net .URI ;
32
+ import java .time .Duration ;
30
33
import java .util .ArrayList ;
34
+ import java .util .Arrays ;
31
35
import java .util .HashMap ;
32
36
import java .util .HashSet ;
33
37
import java .util .List ;
61
65
import org .neo4j .driver .async .StatementResultCursor ;
62
66
import org .neo4j .driver .internal .InternalDriver ;
63
67
import org .neo4j .driver .internal .logging .DevNullLogger ;
68
+ import org .neo4j .driver .internal .util .EnabledOnNeo4jWith ;
64
69
import org .neo4j .driver .internal .util .Futures ;
65
70
import org .neo4j .driver .internal .util .Iterables ;
71
+ import org .neo4j .driver .internal .util .Neo4jFeature ;
72
+ import org .neo4j .driver .reactive .RxSession ;
73
+ import org .neo4j .driver .reactive .RxTransaction ;
66
74
import org .neo4j .driver .types .Node ;
67
75
import org .neo4j .driver .util .DaemonThreadFactory ;
68
76
@@ -88,6 +96,7 @@ abstract class AbstractStressTestBase<C extends AbstractContext>
88
96
89
97
private static final int BIG_DATA_TEST_NODE_COUNT = Integer .getInteger ( "bigDataTestNodeCount" , 30_000 );
90
98
private static final int BIG_DATA_TEST_BATCH_SIZE = Integer .getInteger ( "bigDataTestBatchSize" , 10_000 );
99
+ private static final Duration DEFAULT_BLOCKING_TIME_OUT = Duration .ofMinutes ( 5 );
91
100
92
101
private LoggerNameTrackingLogging logging ;
93
102
private ExecutorService executor ;
@@ -115,7 +124,6 @@ void setUp()
115
124
@ AfterEach
116
125
void tearDown ()
117
126
{
118
- System .out .println ( driver .metrics () );
119
127
executor .shutdownNow ();
120
128
if ( driver != null )
121
129
{
@@ -136,6 +144,13 @@ void asyncApiStressTest() throws Throwable
136
144
runStressTest ( this ::launchAsyncWorkerThreads );
137
145
}
138
146
147
+ @ Test
148
+ @ EnabledOnNeo4jWith ( Neo4jFeature .BOLT_V4 )
149
+ void rxApiStressTest () throws Throwable
150
+ {
151
+ runStressTest ( this ::launchRxWorkerThreads );
152
+ }
153
+
139
154
@ Test
140
155
void blockingApiBigDataTest ()
141
156
{
@@ -150,6 +165,14 @@ void asyncApiBigDataTest() throws Throwable
150
165
readNodesAsync ( driver , bookmark , BIG_DATA_TEST_NODE_COUNT );
151
166
}
152
167
168
+ @ Test
169
+ @ EnabledOnNeo4jWith ( Neo4jFeature .BOLT_V4 )
170
+ void rxApiBigDataTest () throws Throwable
171
+ {
172
+ String bookmark = createNodesRx ( bigDataTestBatchCount (), BIG_DATA_TEST_BATCH_SIZE , driver );
173
+ readNodesRx ( driver , bookmark , BIG_DATA_TEST_NODE_COUNT );
174
+ }
175
+
153
176
private void runStressTest ( Function <C ,List <Future <?>>> threadLauncher ) throws Throwable
154
177
{
155
178
C context = createContext ();
@@ -252,6 +275,71 @@ private Future<Void> launchBlockingWorkerThread( ExecutorService executor, List<
252
275
} );
253
276
}
254
277
278
+ private List <Future <?>> launchRxWorkerThreads ( C context )
279
+ {
280
+ List <RxCommand <C >> commands = createRxCommands ();
281
+ List <Future <?>> futures = new ArrayList <>();
282
+
283
+ for ( int i = 0 ; i < THREAD_COUNT ; i ++ )
284
+ {
285
+ Future <Void > future = launchRxWorkerThread ( executor , commands , context );
286
+ futures .add ( future );
287
+ }
288
+ return futures ;
289
+ }
290
+
291
+ private List <RxCommand <C >> createRxCommands ()
292
+ {
293
+ return Arrays .asList (
294
+ new RxReadQuery <>( driver , false ),
295
+ new RxReadQuery <>( driver , true ),
296
+
297
+ new RxWriteQuery <>( this , driver , false ),
298
+ new RxWriteQuery <>( this , driver , true ),
299
+
300
+ new RxReadQueryInTx <>( driver , false ),
301
+ new RxReadQueryInTx <>( driver , true ),
302
+
303
+ new RxWriteQueryInTx <>( this , driver , false ),
304
+ new RxWriteQueryInTx <>( this , driver , true ),
305
+
306
+ new RxReadQueryWithRetries <>( driver , false ),
307
+ new RxReadQueryWithRetries <>( driver , false ),
308
+
309
+ new RxWriteQueryWithRetries <>( this , driver , false ),
310
+ new RxWriteQueryWithRetries <>( this , driver , true ),
311
+
312
+ new RxFailingQuery <>( driver ),
313
+ new RxFailingQueryInTx <>( driver ),
314
+ new RxFailingQueryWithRetries <>( driver )
315
+ );
316
+ }
317
+
318
+ private Future <Void > launchRxWorkerThread ( ExecutorService executor , List <RxCommand <C >> commands , C context )
319
+ {
320
+ return executor .submit ( () ->
321
+ {
322
+ while ( !context .isStopped () )
323
+ {
324
+ CompletableFuture <Void > allCommands = executeRxCommands ( context , commands , ASYNC_BATCH_SIZE );
325
+ assertNull ( allCommands .get () );
326
+ }
327
+ return null ;
328
+ } );
329
+ }
330
+
331
+ private CompletableFuture <Void > executeRxCommands ( C context , List <RxCommand <C >> commands , int count )
332
+ {
333
+ CompletableFuture <Void >[] executions = new CompletableFuture [count ];
334
+ for ( int i = 0 ; i < count ; i ++ )
335
+ {
336
+ RxCommand <C > command = randomOf ( commands );
337
+ CompletionStage <Void > execution = command .execute ( context );
338
+ executions [i ] = execution .toCompletableFuture ();
339
+ }
340
+ return CompletableFuture .allOf ( executions );
341
+ }
342
+
255
343
private List <Future <?>> launchAsyncWorkerThreads ( C context )
256
344
{
257
345
List <AsyncCommand <C >> commands = createAsyncCommands ();
@@ -529,6 +617,57 @@ private static void readNodesAsync( Driver driver, String bookmark, int expected
529
617
System .out .println ( "Reading nodes with async API took: " + NANOSECONDS .toMillis ( end - start ) + "ms" );
530
618
}
531
619
620
+ private String createNodesRx ( int batchCount , int batchSize , InternalDriver driver )
621
+ {
622
+ long start = System .nanoTime ();
623
+
624
+ RxSession session = driver .rxSession ();
625
+
626
+ Flux .concat ( Flux .range ( 0 , batchCount ).map ( batchIndex ->
627
+ session .writeTransaction ( tx -> createNodesInTxRx ( tx , batchIndex , batchSize ) )
628
+ ) ).blockLast ( DEFAULT_BLOCKING_TIME_OUT ); // throw any error if happened
629
+
630
+ long end = System .nanoTime ();
631
+ System .out .println ( "Node creation with reactive API took: " + NANOSECONDS .toMillis ( end - start ) + "ms" );
632
+
633
+ return session .lastBookmark ();
634
+ }
635
+
636
+ private Publisher <Void > createNodesInTxRx ( RxTransaction tx , int batchIndex , int batchSize )
637
+ {
638
+ return Flux .concat ( Flux .range ( 0 , batchSize ).map ( index -> batchIndex * batchSize + index ).map ( nodeIndex -> {
639
+ Statement statement = createNodeInTxStatement ( nodeIndex );
640
+ return Flux .from ( tx .run ( statement ).summary () ).then (); // As long as there is no error
641
+ } ) );
642
+ }
643
+
644
+ private void readNodesRx ( InternalDriver driver , String bookmark , int expectedNodeCount )
645
+ {
646
+ long start = System .nanoTime ();
647
+
648
+ RxSession session = driver .rxSession ( t -> t .withBookmarks ( bookmark ) );
649
+ AtomicInteger nodesSeen = new AtomicInteger ();
650
+
651
+ Publisher <Void > readQuery = session .readTransaction ( tx -> Flux .from ( tx .run ( "MATCH (n:Node) RETURN n" ).records () ).doOnNext ( record -> {
652
+ Node node = record .get ( 0 ).asNode ();
653
+ nodesSeen .incrementAndGet ();
654
+
655
+ List <String > labels = Iterables .asList ( node .labels () );
656
+ assertEquals ( 2 , labels .size () );
657
+ assertTrue ( labels .contains ( "Test" ) );
658
+ assertTrue ( labels .contains ( "Node" ) );
659
+
660
+ verifyNodeProperties ( node );
661
+ } ).then () );
662
+
663
+ Flux .from ( readQuery ).blockLast ( DEFAULT_BLOCKING_TIME_OUT );
664
+
665
+ assertEquals ( expectedNodeCount , nodesSeen .get () );
666
+
667
+ long end = System .nanoTime ();
668
+ System .out .println ( "Reading nodes with async API took: " + NANOSECONDS .toMillis ( end - start ) + "ms" );
669
+ }
670
+
532
671
private static Void createNodesInTx ( Transaction tx , int batchIndex , int batchSize )
533
672
{
534
673
for ( int index = 0 ; index < batchSize ; index ++ )
0 commit comments