23
23
import org .neo4j .driver .internal .spi .ConnectionPool ;
24
24
import org .neo4j .driver .internal .util .Clock ;
25
25
import org .neo4j .driver .v1 .Logger ;
26
- import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
27
26
import org .neo4j .driver .v1 .exceptions .SecurityException ;
27
+ import org .neo4j .driver .v1 .exceptions .ServiceUnavailableException ;
28
28
29
29
import static java .lang .String .format ;
30
30
@@ -47,59 +47,94 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste
47
47
48
48
// Given the current routing table and connection pool, use the connection composition provider to fetch a new
49
49
// cluster composition, which would be used to update the routing table and connection pool
50
- public ClusterComposition lookupRoutingTable ( ConnectionPool connections , RoutingTable routingTable )
51
- throws InterruptedException
50
+ public ClusterComposition lookupClusterComposition ( ConnectionPool connections , RoutingTable routingTable )
52
51
{
53
52
assertHasRouters ( routingTable );
54
53
int failures = 0 ;
55
54
56
55
for ( long start = clock .millis (), delay = 0 ; ; delay = Math .max ( settings .retryTimeoutDelay , delay * 2 ) )
57
56
{
58
57
long waitTime = start + delay - clock .millis ();
59
- if ( waitTime > 0 )
58
+ sleep ( waitTime );
59
+ start = clock .millis ();
60
+
61
+ ClusterComposition composition = lookupClusterCompositionOnKnownRouters ( connections , routingTable );
62
+ if ( composition != null )
60
63
{
61
- clock .sleep ( waitTime );
64
+ return composition ;
65
+ }
66
+
67
+ if ( ++failures >= settings .maxRoutingFailures )
68
+ {
69
+ throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
70
+ }
71
+ }
72
+ }
73
+
74
+ private ClusterComposition lookupClusterCompositionOnKnownRouters ( ConnectionPool connections ,
75
+ RoutingTable routingTable )
76
+ {
77
+ int size = routingTable .routerSize ();
78
+ for ( int i = 0 ; i < size ; i ++ )
79
+ {
80
+ BoltServerAddress address = routingTable .nextRouter ();
81
+ if ( address == null )
82
+ {
83
+ throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
62
84
}
63
- start = clock .millis ();
64
85
65
- int size = routingTable . routerSize ( );
66
- for ( int i = 0 ; i < size ; i ++ )
86
+ ClusterComposition composition = lookupClusterCompositionOnRouter ( address , connections , routingTable );
87
+ if ( composition != null )
67
88
{
68
- BoltServerAddress address = routingTable . nextRouter () ;
69
- if ( address == null )
70
- {
71
- throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE ) ;
72
- }
89
+ return composition ;
90
+ }
91
+ }
92
+ return null ;
93
+ }
73
94
74
- ClusterCompositionResponse response = null ;
75
- try ( Connection connection = connections .acquire ( address ) )
76
- {
77
- response = provider .getClusterComposition ( connection );
78
- }
79
- catch ( SecurityException e )
80
- {
81
- throw e ; // terminate the discovery immediately
82
- }
83
- catch ( Exception e )
84
- {
85
- // the connection breaks
86
- logger .error ( format ( "Failed to connect to routing server '%s'." , address ), e );
87
- routingTable .removeRouter ( address );
95
+ private ClusterComposition lookupClusterCompositionOnRouter ( BoltServerAddress routerAddress ,
96
+ ConnectionPool connections , RoutingTable routingTable )
97
+ {
98
+ ClusterCompositionResponse response ;
99
+ try ( Connection connection = connections .acquire ( routerAddress ) )
100
+ {
101
+ response = provider .getClusterComposition ( connection );
102
+ }
103
+ catch ( SecurityException e )
104
+ {
105
+ // auth error happened, terminate the discovery procedure immediately
106
+ throw e ;
107
+ }
108
+ catch ( Throwable t )
109
+ {
110
+ // connection turned out to be broken
111
+ logger .error ( format ( "Failed to connect to routing server '%s'." , routerAddress ), t );
112
+ routingTable .removeRouter ( routerAddress );
113
+ return null ;
114
+ }
88
115
89
- assertHasRouters ( routingTable );
90
- continue ;
91
- }
116
+ ClusterComposition cluster = response .clusterComposition ();
117
+ logger .info ( "Got cluster composition %s" , cluster );
118
+ if ( cluster .hasWriters () )
119
+ {
120
+ return cluster ;
121
+ }
122
+ return null ;
123
+ }
92
124
93
- ClusterComposition cluster = response .clusterComposition ();
94
- logger .info ( "Got cluster composition %s" , cluster );
95
- if ( cluster .hasWriters () )
96
- {
97
- return cluster ;
98
- }
125
+ private void sleep ( long millis )
126
+ {
127
+ if ( millis > 0 )
128
+ {
129
+ try
130
+ {
131
+ clock .sleep ( millis );
99
132
}
100
- if ( ++ failures >= settings . maxRoutingFailures )
133
+ catch ( InterruptedException e )
101
134
{
102
- throw new ServiceUnavailableException ( NO_ROUTERS_AVAILABLE );
135
+ // restore the interrupted status
136
+ Thread .currentThread ().interrupt ();
137
+ throw new ServiceUnavailableException ( "Thread was interrupted while performing discovery" , e );
103
138
}
104
139
}
105
140
}
0 commit comments