37
37
import java .util .concurrent .ThreadFactory ;
38
38
import java .util .concurrent .ThreadLocalRandom ;
39
39
import java .util .concurrent .TimeUnit ;
40
- import java .util .concurrent .atomic .AtomicBoolean ;
40
+ import java .util .concurrent .atomic .AtomicLong ;
41
41
42
42
import org .neo4j .driver .v1 .AccessMode ;
43
43
import org .neo4j .driver .v1 .AuthToken ;
@@ -89,7 +89,7 @@ public void setUp() throws Exception
89
89
{
90
90
URI clusterUri = clusterRule .getClusterUri ();
91
91
AuthToken authToken = clusterRule .getAuthToken ();
92
- Config config = Config .build ().withLogging ( DEV_NULL_LOGGING ).toConfig ();
92
+ Config config = Config .build ().withLogging ( DEV_NULL_LOGGING ).withMaxIdleSessions ( THREAD_COUNT ). toConfig ();
93
93
driver = GraphDatabase .driver ( clusterUri , authToken , config );
94
94
95
95
ThreadFactory threadFactory = new DaemonThreadFactory ( getClass ().getSimpleName () + "-worker-" );
@@ -109,11 +109,11 @@ public void tearDown() throws Exception
109
109
@ Test
110
110
public void basicStressTest () throws Throwable
111
111
{
112
- AtomicBoolean stop = new AtomicBoolean ();
113
- List <Future <?>> resultFutures = launchWorkerThreads ( stop );
112
+ Context context = new Context ();
113
+ List <Future <?>> resultFutures = launchWorkerThreads ( context );
114
114
115
115
long openFileDescriptors = sleepAndGetOpenFileDescriptorCount ();
116
- stop . set ( true );
116
+ context . stop ( );
117
117
118
118
Throwable firstError = null ;
119
119
for ( Future <?> future : resultFutures )
@@ -134,16 +134,17 @@ public void basicStressTest() throws Throwable
134
134
}
135
135
136
136
assertNoFileDescriptorLeak ( openFileDescriptors );
137
+ assertExpectedNumberOfNodesCreated ( context .getCreatedNodesCount () );
137
138
}
138
139
139
- private List <Future <?>> launchWorkerThreads ( AtomicBoolean stop )
140
+ private List <Future <?>> launchWorkerThreads ( Context context )
140
141
{
141
142
List <Command > commands = createCommands ();
142
143
List <Future <?>> futures = new ArrayList <>();
143
144
144
145
for ( int i = 0 ; i < THREAD_COUNT ; i ++ )
145
146
{
146
- Future <Void > future = launchWorkerThread ( executor , commands , stop );
147
+ Future <Void > future = launchWorkerThread ( executor , commands , context );
147
148
futures .add ( future );
148
149
}
149
150
@@ -155,9 +156,11 @@ private List<Command> createCommands()
155
156
List <Command > commands = new ArrayList <>();
156
157
157
158
commands .add ( new ReadQuery ( driver ) );
158
- commands .add ( new ReadQueryInTx ( driver ) );
159
+ commands .add ( new ReadQueryInTx ( driver , false ) );
160
+ commands .add ( new ReadQueryInTx ( driver , true ) );
159
161
commands .add ( new WriteQuery ( driver ) );
160
- commands .add ( new WriteQueryInTx ( driver ) );
162
+ commands .add ( new WriteQueryInTx ( driver , false ) );
163
+ commands .add ( new WriteQueryInTx ( driver , true ) );
161
164
commands .add ( new WriteQueryUsingReadSession ( driver ) );
162
165
commands .add ( new WriteQueryUsingReadSessionInTx ( driver ) );
163
166
commands .add ( new FailedAuth ( clusterRule .getClusterUri () ) );
@@ -166,18 +169,20 @@ private List<Command> createCommands()
166
169
}
167
170
168
171
private static Future <Void > launchWorkerThread ( final ExecutorService executor , final List <Command > commands ,
169
- final AtomicBoolean stop )
172
+ final Context context )
170
173
{
171
174
return executor .submit ( new Callable <Void >()
172
175
{
176
+ final ThreadLocalRandom random = ThreadLocalRandom .current ();
177
+
173
178
@ Override
174
179
public Void call () throws Exception
175
180
{
176
- while ( !stop . get () )
181
+ while ( !context . isStopped () )
177
182
{
178
- int randomCommandIdx = ThreadLocalRandom . current () .nextInt ( commands .size () );
183
+ int randomCommandIdx = random .nextInt ( commands .size () );
179
184
Command command = commands .get ( randomCommandIdx );
180
- command .execute ();
185
+ command .execute ( context );
181
186
}
182
187
return null ;
183
188
}
@@ -202,6 +207,18 @@ private void assertNoFileDescriptorLeak( long previousOpenFileDescriptors )
202
207
currentOpenFileDescriptorCount , lessThanOrEqualTo ( maxOpenFileDescriptors ) );
203
208
}
204
209
210
+ private void assertExpectedNumberOfNodesCreated ( long expectedCount )
211
+ {
212
+ try ( Session session = driver .session () )
213
+ {
214
+ List <Record > records = session .run ( "MATCH (n) RETURN count(n) AS nodesCount" ).list ();
215
+ assertEquals ( 1 , records .size () );
216
+ Record record = records .get ( 0 );
217
+ long actualCount = record .get ( "nodesCount" ).asLong ();
218
+ assertEquals ( "Unexpected number of nodes in the database" , expectedCount , actualCount );
219
+ }
220
+ }
221
+
205
222
private static long getOpenFileDescriptorCount ()
206
223
{
207
224
try
@@ -227,9 +244,46 @@ private static Throwable withSuppressed( Throwable firstError, Throwable newErro
227
244
return firstError ;
228
245
}
229
246
247
+ private static class Context
248
+ {
249
+ volatile boolean stopped ;
250
+ volatile String bookmark ;
251
+ final AtomicLong createdNodesCount = new AtomicLong ();
252
+
253
+ boolean isStopped ()
254
+ {
255
+ return stopped ;
256
+ }
257
+
258
+ void stop ()
259
+ {
260
+ this .stopped = true ;
261
+ }
262
+
263
+ String getBookmark ()
264
+ {
265
+ return bookmark ;
266
+ }
267
+
268
+ void setBookmark ( String bookmark )
269
+ {
270
+ this .bookmark = bookmark ;
271
+ }
272
+
273
+ void nodeCreated ()
274
+ {
275
+ createdNodesCount .incrementAndGet ();
276
+ }
277
+
278
+ long getCreatedNodesCount ()
279
+ {
280
+ return createdNodesCount .get ();
281
+ }
282
+ }
283
+
230
284
private interface Command
231
285
{
232
- void execute ();
286
+ void execute ( Context context );
233
287
}
234
288
235
289
private static abstract class BaseQuery implements Command
@@ -240,6 +294,19 @@ private static abstract class BaseQuery implements Command
240
294
{
241
295
this .driver = driver ;
242
296
}
297
+
298
+ Transaction beginTx ( Session session , Context context , boolean useBookmark )
299
+ {
300
+ if ( useBookmark )
301
+ {
302
+ String bookmark = context .getBookmark ();
303
+ if ( bookmark != null )
304
+ {
305
+ return session .beginTransaction ( bookmark );
306
+ }
307
+ }
308
+ return session .beginTransaction ();
309
+ }
243
310
}
244
311
245
312
private static class ReadQuery extends BaseQuery
@@ -250,7 +317,7 @@ private static class ReadQuery extends BaseQuery
250
317
}
251
318
252
319
@ Override
253
- public void execute ()
320
+ public void execute ( Context context )
254
321
{
255
322
try ( Session session = driver .session ( AccessMode .READ ) )
256
323
{
@@ -268,16 +335,19 @@ public void execute()
268
335
269
336
private static class ReadQueryInTx extends BaseQuery
270
337
{
271
- ReadQueryInTx ( Driver driver )
338
+ final boolean useBookmark ;
339
+
340
+ ReadQueryInTx ( Driver driver , boolean useBookmark )
272
341
{
273
342
super ( driver );
343
+ this .useBookmark = useBookmark ;
274
344
}
275
345
276
346
@ Override
277
- public void execute ()
347
+ public void execute ( Context context )
278
348
{
279
349
try ( Session session = driver .session ( AccessMode .READ );
280
- Transaction tx = session . beginTransaction ( ) )
350
+ Transaction tx = beginTx ( session , context , useBookmark ) )
281
351
{
282
352
StatementResult result = tx .run ( "MATCH (n) RETURN n LIMIT 1" );
283
353
List <Record > records = result .list ();
@@ -287,6 +357,7 @@ public void execute()
287
357
Node node = record .get ( 0 ).asNode ();
288
358
assertNotNull ( node );
289
359
}
360
+ tx .success ();
290
361
}
291
362
}
292
363
}
@@ -299,34 +370,44 @@ private static class WriteQuery extends BaseQuery
299
370
}
300
371
301
372
@ Override
302
- public void execute ()
373
+ public void execute ( Context context )
303
374
{
304
375
StatementResult result ;
305
376
try ( Session session = driver .session ( AccessMode .WRITE ) )
306
377
{
307
378
result = session .run ( "CREATE ()" );
308
379
}
309
380
assertEquals ( 1 , result .summary ().counters ().nodesCreated () );
381
+ context .nodeCreated ();
310
382
}
311
383
}
312
384
313
385
private static class WriteQueryInTx extends BaseQuery
314
386
{
315
- WriteQueryInTx ( Driver driver )
387
+ final boolean useBookmark ;
388
+
389
+ WriteQueryInTx ( Driver driver , boolean useBookmark )
316
390
{
317
391
super ( driver );
392
+ this .useBookmark = useBookmark ;
318
393
}
319
394
320
395
@ Override
321
- public void execute ()
396
+ public void execute ( Context context )
322
397
{
323
398
StatementResult result ;
324
- try ( Session session = driver .session ( AccessMode .WRITE );
325
- Transaction tx = session .beginTransaction () )
399
+ try ( Session session = driver .session ( AccessMode .WRITE ) )
326
400
{
327
- result = tx .run ( "CREATE ()" );
401
+ try ( Transaction tx = beginTx ( session , context , useBookmark ) )
402
+ {
403
+ result = tx .run ( "CREATE ()" );
404
+ tx .success ();
405
+ }
406
+
407
+ context .setBookmark ( session .lastBookmark () );
328
408
}
329
409
assertEquals ( 1 , result .summary ().counters ().nodesCreated () );
410
+ context .nodeCreated ();
330
411
}
331
412
}
332
413
@@ -338,7 +419,7 @@ private static class WriteQueryUsingReadSession extends BaseQuery
338
419
}
339
420
340
421
@ Override
341
- public void execute ()
422
+ public void execute ( Context context )
342
423
{
343
424
StatementResult result = null ;
344
425
try
@@ -366,7 +447,7 @@ private static class WriteQueryUsingReadSessionInTx extends BaseQuery
366
447
}
367
448
368
449
@ Override
369
- public void execute ()
450
+ public void execute ( Context context )
370
451
{
371
452
StatementResult result = null ;
372
453
try
@@ -375,6 +456,7 @@ public void execute()
375
456
Transaction tx = session .beginTransaction () )
376
457
{
377
458
result = tx .run ( "CREATE ()" );
459
+ tx .success ();
378
460
}
379
461
fail ( "Exception expected" );
380
462
}
@@ -397,7 +479,7 @@ private static class FailedAuth implements Command
397
479
}
398
480
399
481
@ Override
400
- public void execute ()
482
+ public void execute ( Context context )
401
483
{
402
484
Logger logger = mock ( Logger .class );
403
485
Logging logging = mock ( Logging .class );
0 commit comments