Skip to content

Commit 57eb7ea

Browse files
committed
Updating stress tests to improve stability when testing clustered setup
1 parent d44eadc commit 57eb7ea

13 files changed

+611
-130
lines changed

driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.lang.reflect.Method;
3232
import java.net.URI;
3333
import java.util.ArrayList;
34-
import java.util.Arrays;
34+
import java.util.Collections;
3535
import java.util.HashMap;
3636
import java.util.HashSet;
3737
import java.util.List;
@@ -159,7 +159,7 @@ void asyncApiBigDataTest() throws Throwable
159159
}
160160

161161
@Test
162-
void rxApiBigDataTest() throws Throwable
162+
void rxApiBigDataTest()
163163
{
164164
assertRxIsAvailable();
165165
Bookmark bookmark = createNodesRx( bigDataTestBatchCount(), BIG_DATA_TEST_BATCH_SIZE, driver );
@@ -221,7 +221,17 @@ Config config()
221221

222222
abstract C createContext();
223223

224-
abstract List<BlockingCommand<C>> createTestSpecificBlockingCommands();
224+
List<BlockingCommand<C>> createTestSpecificBlockingCommands() {
225+
return Collections.emptyList();
226+
}
227+
228+
List<AsyncCommand<C>> createTestSpecificAsyncCommands() {
229+
return Collections.emptyList();
230+
}
231+
232+
List<RxCommand<C>> createTestSpecificRxCommands() {
233+
return Collections.emptyList();
234+
}
225235

226236
abstract boolean handleWriteFailure( Throwable error, C context );
227237

@@ -245,23 +255,15 @@ private List<BlockingCommand<C>> createBlockingCommands()
245255
{
246256
List<BlockingCommand<C>> commands = new ArrayList<>();
247257

248-
commands.add( new BlockingReadQuery<>( driver, false ) );
249-
commands.add( new BlockingReadQuery<>( driver, true ) );
250-
251-
commands.add( new BlockingReadQueryInTx<>( driver, false ) );
252-
commands.add( new BlockingReadQueryInTx<>( driver, true ) );
258+
commands.add( new BlockingReadQueryWithRetries<>( driver, false ) );
259+
commands.add( new BlockingReadQueryWithRetries<>( driver, true ) );
253260

254-
commands.add( new BlockingWriteQuery<>( this, driver, false ) );
255-
commands.add( new BlockingWriteQuery<>( this, driver, true ) );
261+
commands.add( new BlockingWriteQueryWithRetries<>( this, driver, false ) );
262+
commands.add( new BlockingWriteQueryWithRetries<>( this, driver, true ) );
256263

257-
commands.add( new BlockingWriteQueryInTx<>( this, driver, false ) );
258-
commands.add( new BlockingWriteQueryInTx<>( this, driver, true ) );
264+
commands.add( new BlockingWrongQueryWithRetries<>( driver ) );
259265

260-
commands.add( new BlockingWrongQuery<>( driver ) );
261-
commands.add( new BlockingWrongQueryInTx<>( driver ) );
262-
263-
commands.add( new BlockingFailingQuery<>( driver ) );
264-
commands.add( new BlockingFailingQueryInTx<>( driver ) );
266+
commands.add( new BlockingFailingQueryWithRetries<>( driver ) );
265267

266268
commands.add( new FailedAuth<>( databaseUri(), config() ) );
267269

@@ -299,29 +301,19 @@ private List<Future<?>> launchRxWorkerThreads( C context )
299301

300302
private List<RxCommand<C>> createRxCommands()
301303
{
302-
return Arrays.asList(
303-
new RxReadQuery<>( driver, false ),
304-
new RxReadQuery<>( driver, true ),
305-
306-
new RxWriteQuery<>( this, driver, false ),
307-
new RxWriteQuery<>( this, driver, true ),
304+
List<RxCommand<C>> commands = new ArrayList<>();
308305

309-
new RxReadQueryInTx<>( driver, false ),
310-
new RxReadQueryInTx<>( driver, true ),
306+
commands.add( new RxReadQueryWithRetries<>( driver, false ) );
307+
commands.add( new RxReadQueryWithRetries<>( driver, true ) );
311308

312-
new RxWriteQueryInTx<>( this, driver, false ),
313-
new RxWriteQueryInTx<>( this, driver, true ),
309+
commands.add( new RxWriteQueryWithRetries<>( this, driver, false ) );
310+
commands.add( new RxWriteQueryWithRetries<>( this, driver, true ) );
314311

315-
new RxReadQueryWithRetries<>( driver, false ),
316-
new RxReadQueryWithRetries<>( driver, false ),
312+
commands.add( new RxFailingQueryWithRetries<>( driver ) );
317313

318-
new RxWriteQueryWithRetries<>( this, driver, false ),
319-
new RxWriteQueryWithRetries<>( this, driver, true ),
314+
commands.addAll( createTestSpecificRxCommands() );
320315

321-
new RxFailingQuery<>( driver ),
322-
new RxFailingQueryInTx<>( driver ),
323-
new RxFailingQueryWithRetries<>( driver )
324-
);
316+
return commands;
325317
}
326318

327319
private Future<Void> launchRxWorkerThread( ExecutorService executor, List<RxCommand<C>> commands, C context )
@@ -367,23 +359,15 @@ private List<AsyncCommand<C>> createAsyncCommands()
367359
{
368360
List<AsyncCommand<C>> commands = new ArrayList<>();
369361

370-
commands.add( new AsyncReadQuery<>( driver, false ) );
371-
commands.add( new AsyncReadQuery<>( driver, true ) );
372-
373-
commands.add( new AsyncReadQueryInTx<>( driver, false ) );
374-
commands.add( new AsyncReadQueryInTx<>( driver, true ) );
375-
376-
commands.add( new AsyncWriteQuery<>( this, driver, false ) );
377-
commands.add( new AsyncWriteQuery<>( this, driver, true ) );
362+
commands.add( new AsyncReadQueryWithRetries<>( driver, false ) );
363+
commands.add( new AsyncReadQueryWithRetries<>( driver, true ) );
378364

379-
commands.add( new AsyncWriteQueryInTx<>( this, driver, false ) );
380-
commands.add( new AsyncWriteQueryInTx<>( this, driver, true ) );
365+
commands.add( new AsyncWriteQueryWithRetries<>( this, driver, false ) );
366+
commands.add( new AsyncWriteQueryWithRetries<>( this, driver, true ) );
381367

382-
commands.add( new AsyncWrongQuery<>( driver ) );
383-
commands.add( new AsyncWrongQueryInTx<>( driver ) );
368+
commands.add( new AsyncWrongQueryWithRetries<>( driver ) );
384369

385-
commands.add( new AsyncFailingQuery<>( driver ) );
386-
commands.add( new AsyncFailingQueryInTx<>( driver ) );
370+
commands.add( new AsyncFailingQueryWithRetries<>( driver ) );
387371

388372
return commands;
389373
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.AccessMode;
24+
import org.neo4j.driver.Driver;
25+
import org.neo4j.driver.async.AsyncSession;
26+
import org.neo4j.driver.async.ResultCursor;
27+
import org.neo4j.driver.internal.util.Futures;
28+
29+
import static org.hamcrest.Matchers.is;
30+
import static org.hamcrest.junit.MatcherAssert.assertThat;
31+
import static org.junit.jupiter.api.Assertions.assertNull;
32+
import static org.neo4j.driver.internal.util.Matchers.arithmeticError;
33+
34+
public class AsyncFailingQueryWithRetries<C extends AbstractContext> extends AbstractAsyncQuery<C>
35+
{
36+
public AsyncFailingQueryWithRetries( Driver driver )
37+
{
38+
super( driver, false );
39+
}
40+
41+
@Override
42+
public CompletionStage<Void> execute( C context )
43+
{
44+
AsyncSession session = newSession( AccessMode.READ, context );
45+
46+
return session.readTransactionAsync( tx -> tx.runAsync( "UNWIND [10, 5, 0] AS x RETURN 10 / x" )
47+
.thenCompose( ResultCursor::listAsync )
48+
.handle( ( records, error ) ->
49+
{
50+
session.closeAsync();
51+
52+
assertNull( records );
53+
Throwable cause = Futures.completionExceptionCause( error );
54+
assertThat( cause, is( arithmeticError() ) );
55+
56+
return null;
57+
} ));
58+
}
59+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.AccessMode;
24+
import org.neo4j.driver.Driver;
25+
import org.neo4j.driver.Record;
26+
import org.neo4j.driver.async.AsyncSession;
27+
import org.neo4j.driver.async.ResultCursor;
28+
import org.neo4j.driver.summary.ResultSummary;
29+
import org.neo4j.driver.types.Node;
30+
31+
import static org.junit.jupiter.api.Assertions.assertNotNull;
32+
33+
public class AsyncReadQueryWithRetries<C extends AbstractContext> extends AbstractAsyncQuery<C>
34+
{
35+
public AsyncReadQueryWithRetries( Driver driver, boolean useBookmark )
36+
{
37+
super( driver, useBookmark );
38+
}
39+
40+
@Override
41+
public CompletionStage<Void> execute( C context )
42+
{
43+
AsyncSession session = newSession( AccessMode.READ, context );
44+
45+
CompletionStage<ResultSummary> queryFinished = session.readTransactionAsync(
46+
tx -> tx.runAsync( "MATCH (n) RETURN n LIMIT 1" )
47+
.thenCompose(
48+
cursor -> cursor.nextAsync()
49+
.thenCompose( record -> processAndGetSummary( record, cursor ) ) ) );
50+
51+
queryFinished.whenComplete( ( summary, error ) ->
52+
{
53+
if ( summary != null )
54+
{
55+
context.readCompleted( summary );
56+
}
57+
session.closeAsync();
58+
} );
59+
60+
return queryFinished.thenApply( summary -> null );
61+
}
62+
63+
private CompletionStage<ResultSummary> processAndGetSummary( Record record, ResultCursor cursor )
64+
{
65+
if ( record != null )
66+
{
67+
Node node = record.get( 0 ).asNode();
68+
assertNotNull( node );
69+
}
70+
return cursor.consumeAsync();
71+
}
72+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.stress;
20+
21+
import java.util.concurrent.CompletionStage;
22+
23+
import org.neo4j.driver.AccessMode;
24+
import org.neo4j.driver.Driver;
25+
import org.neo4j.driver.async.AsyncSession;
26+
import org.neo4j.driver.async.ResultCursor;
27+
import org.neo4j.driver.internal.util.Futures;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
31+
public class AsyncWriteQueryWithRetries<C extends AbstractContext> extends AbstractAsyncQuery<C>
32+
{
33+
private final AbstractStressTestBase<C> stressTest;
34+
35+
public AsyncWriteQueryWithRetries( AbstractStressTestBase<C> stressTest, Driver driver, boolean useBookmark )
36+
{
37+
super( driver, useBookmark );
38+
this.stressTest = stressTest;
39+
}
40+
41+
@Override
42+
public CompletionStage<Void> execute( C context )
43+
{
44+
AsyncSession session = newSession( AccessMode.WRITE, context );
45+
46+
return session.writeTransactionAsync(
47+
tx -> tx.runAsync( "CREATE ()" )
48+
.thenCompose( ResultCursor::consumeAsync ) )
49+
.handle( ( summary, error ) ->
50+
{
51+
session.closeAsync();
52+
53+
if ( error != null )
54+
{
55+
handleError( Futures.completionExceptionCause( error ), context );
56+
}
57+
else
58+
{
59+
context.setBookmark( session.lastBookmark() );
60+
assertEquals( 1, summary.counters().nodesCreated() );
61+
context.nodeCreated();
62+
}
63+
64+
return null;
65+
} );
66+
}
67+
68+
private void handleError( Throwable error, C context )
69+
{
70+
if ( !stressTest.handleWriteFailure( error, context ) )
71+
{
72+
throw new RuntimeException( error );
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)