28
28
import java .util .ArrayList ;
29
29
import java .util .Arrays ;
30
30
import java .util .List ;
31
+ import java .util .Set ;
31
32
import java .util .concurrent .Callable ;
32
33
import java .util .concurrent .CompletionStage ;
33
34
import java .util .concurrent .CountDownLatch ;
58
59
import org .neo4j .driver .v1 .TransactionWork ;
59
60
import org .neo4j .driver .v1 .Values ;
60
61
import org .neo4j .driver .v1 .exceptions .ClientException ;
62
+ import org .neo4j .driver .v1 .exceptions .Neo4jException ;
61
63
import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
62
64
import org .neo4j .driver .v1 .exceptions .SessionExpiredException ;
63
65
import org .neo4j .driver .v1 .exceptions .TransientException ;
@@ -413,11 +415,12 @@ public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception
413
415
414
416
try ( Driver driver = createDriver ( leader .getRoutingUri () ) )
415
417
{
418
+ Set <ClusterMember > cores = cluster .cores ();
416
419
for ( ClusterMember follower : cluster .followers () )
417
420
{
418
421
cluster .kill ( follower );
419
422
}
420
- awaitLeaderToStepDown ( driver );
423
+ awaitLeaderToStepDown ( cores );
421
424
422
425
// now we should be unable to write because majority of cores is down
423
426
for ( int i = 0 ; i < 10 ; i ++ )
@@ -462,11 +465,12 @@ public Integer execute( Transaction tx )
462
465
463
466
ensureNodeVisible ( cluster , "Star Lord" , bookmark );
464
467
468
+ Set <ClusterMember > cores = cluster .cores ();
465
469
for ( ClusterMember follower : cluster .followers () )
466
470
{
467
471
cluster .kill ( follower );
468
472
}
469
- awaitLeaderToStepDown ( driver );
473
+ awaitLeaderToStepDown ( cores );
470
474
471
475
// now we should be unable to write because majority of cores is down
472
476
try ( Session session = driver .session ( AccessMode .WRITE ) )
@@ -913,44 +917,27 @@ public Integer execute( Transaction tx )
913
917
}
914
918
}
915
919
916
- private void awaitLeaderToStepDown ( Driver driver )
920
+ private void awaitLeaderToStepDown ( Set < ClusterMember > cores )
917
921
{
918
- int leadersCount ;
919
- int followersCount ;
920
- int readReplicasCount ;
922
+ long deadline = System .currentTimeMillis () + DEFAULT_TIMEOUT_MS ;
923
+ ClusterOverview overview = null ;
921
924
do
922
925
{
923
- try ( Session session = driver . session () )
926
+ for ( ClusterMember core : cores )
924
927
{
925
- int newLeadersCount = 0 ;
926
- int newFollowersCount = 0 ;
927
- int newReadReplicasCount = 0 ;
928
- for ( Record record : session .run ( "CALL dbms.cluster.overview()" ).list () )
928
+ overview = fetchClusterOverview ( core );
929
+ if ( overview != null )
929
930
{
930
- ClusterMemberRole role = ClusterMemberRole .valueOf ( record .get ( "role" ).asString () );
931
- if ( role == ClusterMemberRole .LEADER )
932
- {
933
- newLeadersCount ++;
934
- }
935
- else if ( role == ClusterMemberRole .FOLLOWER )
936
- {
937
- newFollowersCount ++;
938
- }
939
- else if ( role == ClusterMemberRole .READ_REPLICA )
940
- {
941
- newReadReplicasCount ++;
942
- }
943
- else
944
- {
945
- throw new AssertionError ( "Unknown role: " + role );
946
- }
931
+ break ;
947
932
}
948
- leadersCount = newLeadersCount ;
949
- followersCount = newFollowersCount ;
950
- readReplicasCount = newReadReplicasCount ;
951
933
}
952
934
}
953
- while ( !(leadersCount == 0 && followersCount == 1 && readReplicasCount == 2 ) );
935
+ while ( !isSingleFollowerWithReadReplicas ( overview ) && System .currentTimeMillis () <= deadline );
936
+
937
+ if ( System .currentTimeMillis () > deadline )
938
+ {
939
+ throw new IllegalStateException ( "Leader did not step down in " + DEFAULT_TIMEOUT_MS + "ms. Last seen cluster overview: " + overview );
940
+ }
954
941
}
955
942
956
943
private Driver createDriver ( URI boltUri )
@@ -968,6 +955,43 @@ private Driver discoverDriver( List<URI> routingUris )
968
955
return GraphDatabase .routingDriver ( routingUris , clusterRule .getDefaultAuthToken (), configWithoutLogging () );
969
956
}
970
957
958
+ private ClusterOverview fetchClusterOverview ( ClusterMember member )
959
+ {
960
+ int leaderCount = 0 ;
961
+ int followerCount = 0 ;
962
+ int readReplicaCount = 0 ;
963
+
964
+ Driver driver = clusterRule .getCluster ().getDirectDriver ( member );
965
+ try ( Session session = driver .session () )
966
+ {
967
+ for ( Record record : session .run ( "CALL dbms.cluster.overview()" ).list () )
968
+ {
969
+ ClusterMemberRole role = ClusterMemberRole .valueOf ( record .get ( "role" ).asString () );
970
+ if ( role == ClusterMemberRole .LEADER )
971
+ {
972
+ leaderCount ++;
973
+ }
974
+ else if ( role == ClusterMemberRole .FOLLOWER )
975
+ {
976
+ followerCount ++;
977
+ }
978
+ else if ( role == ClusterMemberRole .READ_REPLICA )
979
+ {
980
+ readReplicaCount ++;
981
+ }
982
+ else
983
+ {
984
+ throw new AssertionError ( "Unknown role: " + role );
985
+ }
986
+ }
987
+ return new ClusterOverview ( leaderCount , followerCount , readReplicaCount );
988
+ }
989
+ catch ( Neo4jException ignore )
990
+ {
991
+ return null ;
992
+ }
993
+ }
994
+
971
995
private static void createNodesInDifferentThreads ( int count , final Driver driver ) throws Exception
972
996
{
973
997
final CountDownLatch beforeRunLatch = new CountDownLatch ( count );
@@ -1133,6 +1157,17 @@ private static ExecutorService newExecutor()
1133
1157
return Executors .newCachedThreadPool ( daemon ( CausalClusteringIT .class .getSimpleName () + "-thread-" ) );
1134
1158
}
1135
1159
1160
+ private static boolean isSingleFollowerWithReadReplicas ( ClusterOverview overview )
1161
+ {
1162
+ if ( overview == null )
1163
+ {
1164
+ return false ;
1165
+ }
1166
+ return overview .leaderCount == 0 &&
1167
+ overview .followerCount == 1 &&
1168
+ overview .readReplicaCount == ClusterRule .READ_REPLICA_COUNT ;
1169
+ }
1170
+
1136
1171
private static class RecordAndSummary
1137
1172
{
1138
1173
final Record record ;
@@ -1144,4 +1179,28 @@ private static class RecordAndSummary
1144
1179
this .summary = summary ;
1145
1180
}
1146
1181
}
1182
+
1183
+ private static class ClusterOverview
1184
+ {
1185
+ final int leaderCount ;
1186
+ final int followerCount ;
1187
+ final int readReplicaCount ;
1188
+
1189
+ ClusterOverview ( int leaderCount , int followerCount , int readReplicaCount )
1190
+ {
1191
+ this .leaderCount = leaderCount ;
1192
+ this .followerCount = followerCount ;
1193
+ this .readReplicaCount = readReplicaCount ;
1194
+ }
1195
+
1196
+ @ Override
1197
+ public String toString ()
1198
+ {
1199
+ return "ClusterOverview{" +
1200
+ "leaderCount=" + leaderCount +
1201
+ ", followerCount=" + followerCount +
1202
+ ", readReplicaCount=" + readReplicaCount +
1203
+ '}' ;
1204
+ }
1205
+ }
1147
1206
}
0 commit comments