Skip to content

Commit c8035ce

Browse files
author
Zhen Li
committed
Added stress tests for reactive sessions
1 parent feddefe commit c8035ce

13 files changed

+793
-1
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import org.neo4j.driver.AccessMode;
22+
import org.neo4j.driver.Driver;
23+
import org.neo4j.driver.reactive.RxSession;
24+
25+
public abstract class AbstractRxQuery<C extends AbstractContext> implements RxCommand<C>
26+
{
27+
protected final Driver driver;
28+
protected final boolean useBookmark;
29+
30+
public AbstractRxQuery( Driver driver, boolean useBookmark )
31+
{
32+
this.driver = driver;
33+
this.useBookmark = useBookmark;
34+
}
35+
36+
public RxSession newSession( AccessMode mode, C context )
37+
{
38+
if ( useBookmark )
39+
{
40+
return driver.rxSession( t -> t.withDefaultAccessMode( mode ).withBookmarks( context.getBookmark() ) );
41+
}
42+
return driver.rxSession( t -> t.withDefaultAccessMode( mode ) );
43+
}
44+
}

driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.lang.reflect.Method;
2929
import java.net.URI;
3030
import java.util.ArrayList;
31+
import java.util.Arrays;
3132
import java.util.HashMap;
3233
import java.util.HashSet;
3334
import java.util.List;
@@ -61,8 +62,10 @@
6162
import org.neo4j.driver.async.StatementResultCursor;
6263
import org.neo4j.driver.internal.InternalDriver;
6364
import org.neo4j.driver.internal.logging.DevNullLogger;
65+
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
6466
import org.neo4j.driver.internal.util.Futures;
6567
import org.neo4j.driver.internal.util.Iterables;
68+
import org.neo4j.driver.internal.util.Neo4jFeature;
6669
import org.neo4j.driver.types.Node;
6770
import org.neo4j.driver.util.DaemonThreadFactory;
6871

@@ -115,7 +118,6 @@ void setUp()
115118
@AfterEach
116119
void tearDown()
117120
{
118-
System.out.println( driver.metrics() );
119121
executor.shutdownNow();
120122
if ( driver != null )
121123
{
@@ -136,6 +138,13 @@ void asyncApiStressTest() throws Throwable
136138
runStressTest( this::launchAsyncWorkerThreads );
137139
}
138140

141+
@Test
142+
@EnabledOnNeo4jWith( Neo4jFeature.BOLT_V4 )
143+
void rxApiStressTest() throws Throwable
144+
{
145+
runStressTest( this::launchRxWorkerThreads );
146+
}
147+
139148
@Test
140149
void blockingApiBigDataTest()
141150
{
@@ -252,6 +261,71 @@ private Future<Void> launchBlockingWorkerThread( ExecutorService executor, List<
252261
} );
253262
}
254263

264+
private List<Future<?>> launchRxWorkerThreads( C context )
265+
{
266+
List<RxCommand<C>> commands = createRxCommands();
267+
List<Future<?>> futures = new ArrayList<>();
268+
269+
for ( int i = 0; i < THREAD_COUNT; i++ )
270+
{
271+
Future<Void> future = launchRxWorkerThread( executor, commands, context );
272+
futures.add( future );
273+
}
274+
return futures;
275+
}
276+
277+
private List<RxCommand<C>> createRxCommands()
278+
{
279+
return Arrays.asList(
280+
new RxReadQuery<>( driver, false ),
281+
new RxReadQuery<>( driver, true ),
282+
283+
new RxWriteQuery<>( this, driver, false ),
284+
new RxWriteQuery<>( this, driver, true ),
285+
286+
new RxReadQueryInTx<>( driver, false ),
287+
new RxReadQueryInTx<>( driver, true ),
288+
289+
new RxWriteQueryInTx<>( this, driver, false ),
290+
new RxWriteQueryInTx<>( this, driver, true ),
291+
292+
new RxReadQueryWithRetries<>( driver, false ),
293+
new RxReadQueryWithRetries<>( driver, false ),
294+
295+
new RxWriteQueryWithRetries<>( this, driver, false ),
296+
new RxWriteQueryWithRetries<>( this, driver, true ),
297+
298+
new RxFailingQuery<>( driver ),
299+
new RxFailingQueryInTx<>( driver ),
300+
new RxFailingQueryWithRetries<>( driver )
301+
);
302+
}
303+
304+
private Future<Void> launchRxWorkerThread( ExecutorService executor, List<RxCommand<C>> commands, C context )
305+
{
306+
return executor.submit( () ->
307+
{
308+
while ( !context.isStopped() )
309+
{
310+
CompletableFuture<Void> allCommands = executeRxCommands( context, commands, ASYNC_BATCH_SIZE );
311+
assertNull( allCommands.get() );
312+
}
313+
return null;
314+
} );
315+
}
316+
317+
private CompletableFuture<Void> executeRxCommands( C context, List<RxCommand<C>> commands, int count )
318+
{
319+
CompletableFuture<Void>[] executions = new CompletableFuture[count];
320+
for ( int i = 0; i < count; i++ )
321+
{
322+
RxCommand<C> command = randomOf( commands );
323+
CompletionStage<Void> execution = command.execute( context );
324+
executions[i] = execution.toCompletableFuture();
325+
}
326+
return CompletableFuture.allOf( executions );
327+
}
328+
255329
private List<Future<?>> launchAsyncWorkerThreads( C context )
256330
{
257331
List<AsyncCommand<C>> commands = createAsyncCommands();
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
public interface RxCommand<C extends AbstractContext>
24+
{
25+
CompletionStage<Void> execute( C context );
26+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
26+
27+
import org.neo4j.driver.AccessMode;
28+
import org.neo4j.driver.Driver;
29+
import org.neo4j.driver.internal.util.Futures;
30+
import org.neo4j.driver.reactive.RxSession;
31+
32+
import static org.hamcrest.Matchers.either;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.is;
35+
import static org.hamcrest.junit.MatcherAssert.assertThat;
36+
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;
37+
38+
public class RxFailingQuery<C extends AbstractContext> extends AbstractRxQuery<C>
39+
{
40+
public RxFailingQuery( Driver driver )
41+
{
42+
super( driver, false );
43+
}
44+
45+
@Override
46+
public CompletionStage<Void> execute( C context )
47+
{
48+
CompletableFuture<Void> queryFinished = new CompletableFuture<>();
49+
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.READ, context ) ),
50+
session -> session.run( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ).records(),
51+
RxSession::close )
52+
.subscribe( record -> {
53+
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54+
queryFinished.complete( null );
55+
}, error -> {
56+
Throwable cause = Futures.completionExceptionCause( error );
57+
assertThat( cause, is( arithmeticError() ) );
58+
queryFinished.complete( null );
59+
});
60+
return queryFinished;
61+
}
62+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import reactor.core.publisher.Flux;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CompletionStage;
25+
26+
import org.neo4j.driver.AccessMode;
27+
import org.neo4j.driver.Driver;
28+
import org.neo4j.driver.internal.util.Futures;
29+
import org.neo4j.driver.reactive.RxSession;
30+
import org.neo4j.driver.reactive.RxTransaction;
31+
32+
import static org.hamcrest.Matchers.either;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.is;
35+
import static org.hamcrest.junit.MatcherAssert.assertThat;
36+
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;
37+
38+
public class RxFailingQueryInTx<C extends AbstractContext> extends AbstractRxQuery<C>
39+
{
40+
public RxFailingQueryInTx( Driver driver )
41+
{
42+
super( driver, false );
43+
}
44+
45+
@Override
46+
public CompletionStage<Void> execute( C context )
47+
{
48+
CompletableFuture<Void> queryFinished = new CompletableFuture<>();
49+
RxSession session = newSession( AccessMode.READ, context );
50+
Flux.usingWhen( session.beginTransaction(),
51+
tx -> tx.run( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ).records(),
52+
RxTransaction::commit, RxTransaction::rollback )
53+
.subscribe( record -> {
54+
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
55+
queryFinished.complete( null );
56+
}, error -> {
57+
Throwable cause = Futures.completionExceptionCause( error );
58+
assertThat( cause, is( arithmeticError() ) );
59+
queryFinished.complete( null );
60+
});
61+
return queryFinished;
62+
}
63+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
23+
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
26+
27+
import org.neo4j.driver.AccessMode;
28+
import org.neo4j.driver.Driver;
29+
import org.neo4j.driver.internal.util.Futures;
30+
import org.neo4j.driver.reactive.RxSession;
31+
32+
import static org.hamcrest.Matchers.either;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.is;
35+
import static org.hamcrest.junit.MatcherAssert.assertThat;
36+
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;
37+
38+
public class RxFailingQueryWithRetries<C extends AbstractContext> extends AbstractRxQuery<C>
39+
{
40+
public RxFailingQueryWithRetries( Driver driver )
41+
{
42+
super( driver, false );
43+
}
44+
45+
@Override
46+
public CompletionStage<Void> execute( C context )
47+
{
48+
CompletableFuture<Void> queryFinished = new CompletableFuture<>();
49+
Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.READ, context ) ),
50+
session -> session.readTransaction( tx -> tx.run( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ).records() ),
51+
RxSession::close )
52+
.subscribe( record -> {
53+
assertThat( record.get( 0 ).asInt(), either( equalTo( 1 ) ).or( equalTo( 2 ) ) );
54+
queryFinished.complete( null );
55+
}, error -> {
56+
Throwable cause = Futures.completionExceptionCause( error );
57+
assertThat( cause, is( arithmeticError() ) );
58+
queryFinished.complete( null );
59+
});
60+
return queryFinished;
61+
}
62+
}

0 commit comments

Comments
 (0)