27
27
import java .util .ArrayList ;
28
28
import java .util .Arrays ;
29
29
import java .util .List ;
30
- import java .util .Map ;
31
- import java .util .Set ;
32
30
import java .util .concurrent .Callable ;
33
31
import java .util .concurrent .CompletionStage ;
34
32
import java .util .concurrent .CountDownLatch ;
55
53
import org .neo4j .driver .async .AsyncSession ;
56
54
import org .neo4j .driver .async .ResultCursor ;
57
55
import org .neo4j .driver .exceptions .ClientException ;
58
- import org .neo4j .driver .exceptions .Neo4jException ;
59
56
import org .neo4j .driver .exceptions .ServiceUnavailableException ;
60
57
import org .neo4j .driver .exceptions .SessionExpiredException ;
61
58
import org .neo4j .driver .integration .NestedQueries ;
62
- import org .neo4j .driver .internal .BoltServerAddress ;
63
59
import org .neo4j .driver .internal .cluster .RoutingSettings ;
64
60
import org .neo4j .driver .internal .retry .RetrySettings ;
65
61
import org .neo4j .driver .internal .security .SecurityPlanImpl ;
66
62
import org .neo4j .driver .internal .util .FailingConnectionDriverFactory ;
67
63
import org .neo4j .driver .internal .util .FakeClock ;
68
- import org .neo4j .driver .internal .util .ServerVersion ;
69
64
import org .neo4j .driver .internal .util .ThrowingMessageEncoder ;
70
65
import org .neo4j .driver .internal .util .io .ChannelTrackingDriverFactory ;
71
66
import org .neo4j .driver .summary .ResultSummary ;
72
67
import org .neo4j .driver .util .cc .Cluster ;
73
68
import org .neo4j .driver .util .cc .ClusterExtension ;
74
69
import org .neo4j .driver .util .cc .ClusterMember ;
75
- import org .neo4j .driver .util .cc .ClusterMemberRole ;
76
- import org .neo4j .driver .util .cc .ClusterMemberRoleDiscoveryFactory ;
77
- import org .neo4j .driver .util .cc .ClusterMemberRoleDiscoveryFactory .ClusterMemberRoleDiscovery ;
78
70
79
71
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
80
72
import static java .util .concurrent .TimeUnit .MINUTES ;
96
88
import static org .neo4j .driver .internal .InternalBookmark .parse ;
97
89
import static org .neo4j .driver .internal .logging .DevNullLogging .DEV_NULL_LOGGING ;
98
90
import static org .neo4j .driver .internal .util .Matchers .connectionAcquisitionTimeoutError ;
99
- import static org .neo4j .driver .internal .util .Neo4jFeature .BOLT_V3 ;
100
91
import static org .neo4j .driver .util .DaemonThreadFactory .daemon ;
101
92
import static org .neo4j .driver .util .TestUtil .await ;
102
93
import static org .neo4j .driver .util .TestUtil .awaitAllFutures ;
@@ -481,61 +472,6 @@ RoutingSettings.DEFAULT, RetrySettings.DEFAULT, configWithoutLogging(), Security
481
472
}
482
473
}
483
474
484
- @ Test
485
- void shouldRediscoverWhenConnectionsToAllCoresBreak ()
486
- {
487
- Cluster cluster = clusterRule .getCluster ();
488
-
489
- ChannelTrackingDriverFactory driverFactory = new ChannelTrackingDriverFactory ();
490
- try ( Driver driver = driverFactory .newInstance ( cluster .getRoutingUri (), clusterRule .getDefaultAuthToken (),
491
- RoutingSettings .DEFAULT , RetrySettings .DEFAULT , configWithoutLogging (), SecurityPlanImpl .insecure () ) )
492
- {
493
- String database = "neo4j" ;
494
- try ( Session session = driver .session ( builder ().withDatabase ( database ).build () ) )
495
- {
496
- createNode ( session , "Person" , "name" , "Vision" );
497
-
498
- // force driver to connect to every cluster member
499
- for ( int i = 0 ; i < cluster .members ().size (); i ++ )
500
- {
501
- assertEquals ( 1 , countNodes ( session , "Person" , "name" , "Vision" ) );
502
- }
503
- }
504
-
505
- // now driver should have connections towards every cluster member
506
- // make all those connections throw and seem broken
507
- makeAllChannelsFailToRunQueries ( driverFactory , ServerVersion .version ( driver ) );
508
-
509
- // observe that connection towards writer is broken
510
- try ( Session session = driver .session ( builder ().withDatabase ( database ).withDefaultAccessMode ( AccessMode .WRITE ).build () ) )
511
- {
512
- SessionExpiredException e = assertThrows ( SessionExpiredException .class ,
513
- () -> runCreateNode ( session , "Person" , "name" , "Vision" ).consume () );
514
- assertEquals ( "Disconnected" , e .getCause ().getMessage () );
515
- }
516
-
517
- // probe connections to all readers
518
- int readersCount = cluster .followers ().size () + cluster .readReplicas ().size ();
519
- for ( int i = 0 ; i < readersCount ; i ++ )
520
- {
521
- try ( Session session = driver .session ( builder ().withDatabase ( database ).withDefaultAccessMode ( AccessMode .READ ).build () ) )
522
- {
523
- runCountNodes ( session , "Person" , "name" , "Vision" );
524
- }
525
- catch ( Throwable ignore )
526
- {
527
- }
528
- }
529
-
530
- try ( Session session = driver .session ( builder ().withDatabase ( database ).build () ) )
531
- {
532
- updateNode ( session , "Person" , "name" , "Vision" , "Thanos" );
533
- assertEquals ( 0 , countNodes ( session , "Person" , "name" , "Vision" ) );
534
- assertEquals ( 1 , countNodes ( session , "Person" , "name" , "Thanos" ) );
535
- }
536
- }
537
- }
538
-
539
475
@ Test
540
476
void shouldKeepOperatingWhenConnectionsBreak () throws Exception
541
477
{
@@ -672,52 +608,6 @@ private <T> T inExpirableSession( Driver driver, Function<Driver,Session> acquir
672
608
throw new TimeoutException ( "Transaction did not succeed in time" );
673
609
}
674
610
675
- private void ensureNodeVisible ( Cluster cluster , String name , Bookmark bookmark )
676
- {
677
- for ( ClusterMember member : cluster .members () )
678
- {
679
- int count = countNodesUsingDirectDriver ( member , name , bookmark );
680
- assertEquals ( 1 , count );
681
- }
682
- }
683
-
684
- private int countNodesUsingDirectDriver ( ClusterMember member , final String name , Bookmark bookmark )
685
- {
686
- Driver driver = clusterRule .getCluster ().getDirectDriver ( member );
687
- try ( Session session = driver .session ( builder ().withBookmarks ( bookmark ).build () ) )
688
- {
689
- return session .readTransaction ( tx ->
690
- {
691
- Result result = tx .run ( "MATCH (:Person {name: $name}) RETURN count(*)" ,
692
- parameters ( "name" , name ) );
693
- return result .single ().get ( 0 ).asInt ();
694
- } );
695
- }
696
- }
697
-
698
- private void awaitLeaderToStepDown ( Set <ClusterMember > cores )
699
- {
700
- long deadline = System .currentTimeMillis () + DEFAULT_TIMEOUT_MS ;
701
- ClusterOverview overview = null ;
702
- do
703
- {
704
- for ( ClusterMember core : cores )
705
- {
706
- overview = fetchClusterOverview ( core );
707
- if ( overview != null )
708
- {
709
- break ;
710
- }
711
- }
712
- }
713
- while ( !isSingleFollowerWithReadReplicas ( overview ) && System .currentTimeMillis () <= deadline );
714
-
715
- if ( System .currentTimeMillis () > deadline )
716
- {
717
- throw new IllegalStateException ( "Leader did not step down in " + DEFAULT_TIMEOUT_MS + "ms. Last seen cluster overview: " + overview );
718
- }
719
- }
720
-
721
611
private Driver createDriver ( URI boltUri )
722
612
{
723
613
return createDriver ( boltUri , configWithoutLogging () );
@@ -733,45 +623,6 @@ private Driver discoverDriver( List<URI> routingUris )
733
623
return GraphDatabase .routingDriver ( routingUris , clusterRule .getDefaultAuthToken (), configWithoutLogging () );
734
624
}
735
625
736
- private static ClusterOverview fetchClusterOverview ( ClusterMember member )
737
- {
738
- int leaderCount = 0 ;
739
- int followerCount = 0 ;
740
- int readReplicaCount = 0 ;
741
-
742
- Driver driver = clusterRule .getCluster ().getDirectDriver ( member );
743
- try
744
- {
745
- final ClusterMemberRoleDiscovery discovery = ClusterMemberRoleDiscoveryFactory .newInstance ( ServerVersion .version ( driver ) );
746
- final Map <BoltServerAddress ,ClusterMemberRole > clusterOverview = discovery .findClusterOverview ( driver );
747
- for ( BoltServerAddress address : clusterOverview .keySet () )
748
- {
749
- ClusterMemberRole role = clusterOverview .get ( address );
750
- if ( role == ClusterMemberRole .LEADER )
751
- {
752
- leaderCount ++;
753
- }
754
- else if ( role == ClusterMemberRole .FOLLOWER )
755
- {
756
- followerCount ++;
757
- }
758
- else if ( role == ClusterMemberRole .READ_REPLICA )
759
- {
760
- readReplicaCount ++;
761
- }
762
- else
763
- {
764
- throw new AssertionError ( "Unknown role: " + role );
765
- }
766
- }
767
- return new ClusterOverview ( leaderCount , followerCount , readReplicaCount );
768
- }
769
- catch ( Neo4jException ignore )
770
- {
771
- return null ;
772
- }
773
- }
774
-
775
626
private static void createNodesInDifferentThreads ( int count , final Driver driver ) throws Exception
776
627
{
777
628
final CountDownLatch beforeRunLatch = new CountDownLatch ( count );
@@ -860,16 +711,6 @@ private static void createNode( Session session, String label, String property,
860
711
} );
861
712
}
862
713
863
- private static void updateNode ( Session session , String label , String property , String oldValue , String newValue )
864
- {
865
- session .writeTransaction ( tx ->
866
- {
867
- tx .run ( "MATCH (n: " + label + '{' + property + ": $oldValue}) SET n." + property + " = $newValue" ,
868
- parameters ( "oldValue" , oldValue , "newValue" , newValue ) );
869
- return null ;
870
- } );
871
- }
872
-
873
714
private static int countNodes ( Session session , String label , String property , String value )
874
715
{
875
716
return session .readTransaction ( tx -> runCountNodes ( tx , label , property , value ) );
@@ -915,33 +756,6 @@ private static ExecutorService newExecutor()
915
756
return Executors .newCachedThreadPool ( daemon ( CausalClusteringIT .class .getSimpleName () + "-thread-" ) );
916
757
}
917
758
918
- private static boolean isSingleFollowerWithReadReplicas ( ClusterOverview overview )
919
- {
920
- if ( overview == null )
921
- {
922
- return false ;
923
- }
924
- return overview .leaderCount == 0 &&
925
- overview .followerCount == 1 &&
926
- overview .readReplicaCount == ClusterExtension .READ_REPLICA_COUNT ;
927
- }
928
-
929
- private static void makeAllChannelsFailToRunQueries ( ChannelTrackingDriverFactory driverFactory , ServerVersion dbVersion )
930
- {
931
- for ( Channel channel : driverFactory .channels () )
932
- {
933
- RuntimeException error = new ServiceUnavailableException ( "Disconnected" );
934
- if ( BOLT_V3 .availableIn ( dbVersion ) )
935
- {
936
- channel .pipeline ().addLast ( ThrowingMessageEncoder .forRunWithMetadataMessage ( error ) );
937
- }
938
- else
939
- {
940
- channel .pipeline ().addLast ( ThrowingMessageEncoder .forRunMessage ( error ) );
941
- }
942
- }
943
- }
944
-
945
759
private static class RecordAndSummary
946
760
{
947
761
final Record record ;
@@ -953,28 +767,4 @@ private static class RecordAndSummary
953
767
this .summary = summary ;
954
768
}
955
769
}
956
-
957
- private static class ClusterOverview
958
- {
959
- final int leaderCount ;
960
- final int followerCount ;
961
- final int readReplicaCount ;
962
-
963
- ClusterOverview ( int leaderCount , int followerCount , int readReplicaCount )
964
- {
965
- this .leaderCount = leaderCount ;
966
- this .followerCount = followerCount ;
967
- this .readReplicaCount = readReplicaCount ;
968
- }
969
-
970
- @ Override
971
- public String toString ()
972
- {
973
- return "ClusterOverview{" +
974
- "leaderCount=" + leaderCount +
975
- ", followerCount=" + followerCount +
976
- ", readReplicaCount=" + readReplicaCount +
977
- '}' ;
978
- }
979
- }
980
770
}
0 commit comments