28
28
import java .lang .reflect .Method ;
29
29
import java .net .URI ;
30
30
import java .util .ArrayList ;
31
+ import java .util .HashMap ;
31
32
import java .util .HashSet ;
32
33
import java .util .List ;
34
+ import java .util .Map ;
33
35
import java .util .Set ;
34
36
import java .util .concurrent .Callable ;
35
37
import java .util .concurrent .ConcurrentHashMap ;
38
+ import java .util .concurrent .ConcurrentMap ;
36
39
import java .util .concurrent .ExecutorService ;
37
40
import java .util .concurrent .Executors ;
38
41
import java .util .concurrent .Future ;
42
45
import java .util .concurrent .atomic .AtomicLong ;
43
46
44
47
import org .neo4j .driver .internal .logging .DevNullLogger ;
48
+ import org .neo4j .driver .internal .util .ServerVersion ;
45
49
import org .neo4j .driver .v1 .AccessMode ;
46
50
import org .neo4j .driver .v1 .AuthToken ;
47
51
import org .neo4j .driver .v1 .Config ;
57
61
import org .neo4j .driver .v1 .exceptions .SecurityException ;
58
62
import org .neo4j .driver .v1 .types .Node ;
59
63
import org .neo4j .driver .v1 .util .DaemonThreadFactory ;
64
+ import org .neo4j .driver .v1 .util .cc .ClusterMemberRole ;
60
65
import org .neo4j .driver .v1 .util .cc .LocalOrRemoteClusterRule ;
61
66
62
67
import static java .util .Collections .newSetFromMap ;
68
+ import static org .hamcrest .Matchers .both ;
63
69
import static org .hamcrest .Matchers .containsString ;
64
70
import static org .hamcrest .Matchers .equalTo ;
71
+ import static org .hamcrest .Matchers .greaterThan ;
72
+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
65
73
import static org .hamcrest .Matchers .instanceOf ;
66
74
import static org .hamcrest .Matchers .lessThanOrEqualTo ;
67
75
import static org .junit .Assert .assertEquals ;
71
79
import static org .junit .Assert .fail ;
72
80
import static org .neo4j .driver .internal .util .Iterables .single ;
73
81
import static org .neo4j .driver .v1 .AuthTokens .basic ;
82
+ import static org .neo4j .driver .v1 .util .cc .ClusterMember .SIMPLE_SCHEME ;
74
83
75
84
public class CausalClusteringStressIT
76
85
{
@@ -138,6 +147,7 @@ public void basicStressTest() throws Throwable
138
147
assertNoFileDescriptorLeak ( resourcesInfo .openFileDescriptorCount );
139
148
assertNoLoggersLeak ( resourcesInfo .acquiredLoggerNames );
140
149
assertExpectedNumberOfNodesCreated ( context .getCreatedNodesCount () );
150
+ assertGoodReadQueryDistribution ( context .getReadQueriesByServer () );
141
151
}
142
152
143
153
private List <Future <?>> launchWorkerThreads ( Context context )
@@ -235,6 +245,96 @@ private void assertExpectedNumberOfNodesCreated( long expectedCount )
235
245
}
236
246
}
237
247
248
+ private void assertGoodReadQueryDistribution ( Map <String ,Long > readQueriesByServer )
249
+ {
250
+ ClusterAddresses clusterAddresses = fetchClusterAddresses ( driver );
251
+
252
+ // before 3.2.0 only read replicas serve reads
253
+ boolean readsOnFollowersEnabled = ServerVersion .version ( driver ).greaterThanOrEqual ( ServerVersion .v3_2_0 );
254
+
255
+ if ( readsOnFollowersEnabled )
256
+ {
257
+ // expect all followers to serve more than zero read queries
258
+ assertAllAddressesServedReadQueries ( "Follower" , clusterAddresses .followers , readQueriesByServer );
259
+ }
260
+
261
+ // expect all read replicas to serve more than zero read queries
262
+ assertAllAddressesServedReadQueries ( "Read replica" , clusterAddresses .readReplicas , readQueriesByServer );
263
+
264
+ if ( readsOnFollowersEnabled )
265
+ {
266
+ // expect all followers to serve same order of magnitude read queries
267
+ assertAllAddressesServedSimilarAmountOfReadQueries ( "Followers" , clusterAddresses .followers ,
268
+ readQueriesByServer , clusterAddresses );
269
+ }
270
+
271
+ // expect all read replicas to serve same order of magnitude read queries
272
+ assertAllAddressesServedSimilarAmountOfReadQueries ( "Read replicas" , clusterAddresses .readReplicas ,
273
+ readQueriesByServer , clusterAddresses );
274
+ }
275
+
276
+ private static ClusterAddresses fetchClusterAddresses ( Driver driver )
277
+ {
278
+ Set <String > followers = new HashSet <>();
279
+ Set <String > readReplicas = new HashSet <>();
280
+
281
+ try ( Session session = driver .session () )
282
+ {
283
+ List <Record > records = session .run ( "CALL dbms.cluster.overview()" ).list ();
284
+ for ( Record record : records )
285
+ {
286
+ List <Object > addresses = record .get ( "addresses" ).asList ();
287
+ String boltAddress = ((String ) addresses .get ( 0 )).replace ( SIMPLE_SCHEME , "" );
288
+
289
+ ClusterMemberRole role = ClusterMemberRole .valueOf ( record .get ( "role" ).asString () );
290
+ if ( role == ClusterMemberRole .FOLLOWER )
291
+ {
292
+ followers .add ( boltAddress );
293
+ }
294
+ else if ( role == ClusterMemberRole .READ_REPLICA )
295
+ {
296
+ readReplicas .add ( boltAddress );
297
+ }
298
+ }
299
+ }
300
+
301
+ return new ClusterAddresses ( followers , readReplicas );
302
+ }
303
+
304
+ private static void assertAllAddressesServedReadQueries ( String addressType , Set <String > addresses ,
305
+ Map <String ,Long > readQueriesByServer )
306
+ {
307
+ for ( String address : addresses )
308
+ {
309
+ Long queries = readQueriesByServer .get ( address );
310
+ assertThat ( addressType + " did not serve any read queries" , queries , greaterThan ( 0L ) );
311
+ }
312
+ }
313
+
314
+ private static void assertAllAddressesServedSimilarAmountOfReadQueries ( String addressesType , Set <String > addresses ,
315
+ Map <String ,Long > readQueriesByServer , ClusterAddresses allAddresses )
316
+ {
317
+ long expectedOrderOfMagnitude = -1 ;
318
+ for ( String address : addresses )
319
+ {
320
+ long queries = readQueriesByServer .get ( address );
321
+ long orderOfMagnitude = orderOfMagnitude ( queries );
322
+ if ( expectedOrderOfMagnitude == -1 )
323
+ {
324
+ expectedOrderOfMagnitude = orderOfMagnitude ;
325
+ }
326
+ else
327
+ {
328
+ assertThat ( addressesType + " are expected to serve similar amount of queries. " +
329
+ "Addresses: " + allAddresses + ", " +
330
+ "read queries served: " + readQueriesByServer ,
331
+ orderOfMagnitude ,
332
+ both ( greaterThanOrEqualTo ( expectedOrderOfMagnitude - 1 ) )
333
+ .and ( lessThanOrEqualTo ( expectedOrderOfMagnitude + 1 ) ) );
334
+ }
335
+ }
336
+ }
337
+
238
338
private static long getOpenFileDescriptorCount ()
239
339
{
240
340
try
@@ -260,11 +360,23 @@ private static Throwable withSuppressed( Throwable firstError, Throwable newErro
260
360
return firstError ;
261
361
}
262
362
363
+ private static long orderOfMagnitude ( long number )
364
+ {
365
+ long result = 1 ;
366
+ while ( number >= 10 )
367
+ {
368
+ number /= 10 ;
369
+ result ++;
370
+ }
371
+ return result ;
372
+ }
373
+
263
374
private static class Context
264
375
{
265
376
volatile boolean stopped ;
266
377
volatile String bookmark ;
267
378
final AtomicLong createdNodesCount = new AtomicLong ();
379
+ final ConcurrentMap <String ,AtomicLong > readQueriesByServer = new ConcurrentHashMap <>();
268
380
269
381
boolean isStopped ()
270
382
{
@@ -295,6 +407,33 @@ long getCreatedNodesCount()
295
407
{
296
408
return createdNodesCount .get ();
297
409
}
410
+
411
+ void readCompleted ( StatementResult result )
412
+ {
413
+ String serverAddress = result .summary ().server ().address ();
414
+
415
+ AtomicLong count = readQueriesByServer .get ( serverAddress );
416
+ if ( count == null )
417
+ {
418
+ count = new AtomicLong ();
419
+ AtomicLong existingCounter = readQueriesByServer .putIfAbsent ( serverAddress , count );
420
+ if ( existingCounter != null )
421
+ {
422
+ count = existingCounter ;
423
+ }
424
+ }
425
+ count .incrementAndGet ();
426
+ }
427
+
428
+ Map <String ,Long > getReadQueriesByServer ()
429
+ {
430
+ Map <String ,Long > result = new HashMap <>();
431
+ for ( Map .Entry <String ,AtomicLong > entry : readQueriesByServer .entrySet () )
432
+ {
433
+ result .put ( entry .getKey (), entry .getValue ().get () );
434
+ }
435
+ return result ;
436
+ }
298
437
}
299
438
300
439
private interface Command
@@ -343,6 +482,8 @@ public void execute( Context context )
343
482
Node node = record .get ( 0 ).asNode ();
344
483
assertNotNull ( node );
345
484
}
485
+
486
+ context .readCompleted ( result );
346
487
}
347
488
}
348
489
}
@@ -368,6 +509,8 @@ public void execute( Context context )
368
509
Node node = record .get ( 0 ).asNode ();
369
510
assertNotNull ( node );
370
511
}
512
+
513
+ context .readCompleted ( result );
371
514
tx .success ();
372
515
}
373
516
}
@@ -529,9 +672,30 @@ public Logger getLog( String name )
529
672
return DevNullLogger .DEV_NULL_LOGGER ;
530
673
}
531
674
532
- public Set <String > getAcquiredLoggerNames ()
675
+ Set <String > getAcquiredLoggerNames ()
533
676
{
534
677
return new HashSet <>( acquiredLoggerNames );
535
678
}
536
679
}
680
+
681
+ private static class ClusterAddresses
682
+ {
683
+ final Set <String > followers ;
684
+ final Set <String > readReplicas ;
685
+
686
+ ClusterAddresses ( Set <String > followers , Set <String > readReplicas )
687
+ {
688
+ this .followers = followers ;
689
+ this .readReplicas = readReplicas ;
690
+ }
691
+
692
+ @ Override
693
+ public String toString ()
694
+ {
695
+ return "ClusterAddresses{" +
696
+ "followers=" + followers +
697
+ ", readReplicas=" + readReplicas +
698
+ '}' ;
699
+ }
700
+ }
537
701
}
0 commit comments