@@ -66,6 +66,7 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
66
66
}
67
67
};
68
68
private static final int MIN_SERVERS = 1 ;
69
+ private static final int CONNECTION_RETRIES = 3 ;
69
70
private final ConnectionPool connections ;
70
71
private final BiFunction <Connection ,Logger ,Session > sessionProvider ;
71
72
private final Clock clock ;
@@ -110,7 +111,6 @@ private Set<BoltServerAddress> forgetAllServers()
110
111
seen .addAll ( routingServers );
111
112
seen .addAll ( readServers );
112
113
seen .addAll ( writeServers );
113
- routingServers .clear ();
114
114
readServers .clear ();
115
115
writeServers .clear ();
116
116
return seen ;
@@ -138,11 +138,11 @@ private void getServers()
138
138
{
139
139
boolean success = false ;
140
140
141
- ConcurrentRoundRobinSet <BoltServerAddress > routers = new ConcurrentRoundRobinSet <>( routingServers );
141
+ final Set <BoltServerAddress > newRouters = new HashSet <>( );
142
142
final Set <BoltServerAddress > seen = forgetAllServers ();
143
- while ( !routers .isEmpty () && !success )
143
+ while ( !routingServers .isEmpty () && !success )
144
144
{
145
- address = routers .hop ();
145
+ address = routingServers .hop ();
146
146
success = call ( address , GET_SERVERS , new Consumer <Record >()
147
147
{
148
148
@ Override
@@ -162,12 +162,19 @@ public void accept( Record record )
162
162
writeServers .addAll ( server .addresses () );
163
163
break ;
164
164
case "ROUTE" :
165
- routingServers .addAll ( server .addresses () );
165
+ newRouters .addAll ( server .addresses () );
166
166
break ;
167
167
}
168
168
}
169
169
}
170
170
} );
171
+ //We got trough but server gave us an empty list of routers
172
+ if (success && newRouters .isEmpty ()) {
173
+ success = false ;
174
+ } else if (success ) {
175
+ routingServers .clear ();
176
+ routingServers .addAll ( newRouters );
177
+ }
171
178
}
172
179
if ( !success )
173
180
{
@@ -249,7 +256,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
249
256
recorder .accept ( records .next () );
250
257
}
251
258
}
252
- catch ( ConnectionFailureException e )
259
+ catch ( Throwable e )
253
260
{
254
261
forget ( address );
255
262
return false ;
@@ -306,18 +313,36 @@ public void onWriteFailure( BoltServerAddress address )
306
313
307
314
private Connection acquireConnection ( AccessMode role )
308
315
{
309
- //Potentially rediscover servers if we are not happy with our current knowledge
310
- checkServers ();
311
-
316
+ ConcurrentRoundRobinSet <BoltServerAddress > servers ;
312
317
switch ( role )
313
318
{
314
319
case READ :
315
- return connections .acquire ( readServers .hop () );
320
+ servers = readServers ;
321
+ break ;
316
322
case WRITE :
317
- return connections .acquire ( writeServers .hop () );
323
+ servers = writeServers ;
324
+ break ;
318
325
default :
319
326
throw new ClientException ( role + " is not supported for creating new sessions" );
320
327
}
328
+
329
+ //Potentially rediscover servers if we are not happy with our current knowledge
330
+ checkServers ();
331
+ int numberOfServers = servers .size ();
332
+ for ( int i = 0 ; i < numberOfServers ; i ++ )
333
+ {
334
+ BoltServerAddress address = servers .hop ();
335
+ try
336
+ {
337
+ return connections .acquire ( address );
338
+ }
339
+ catch ( ConnectionFailureException e )
340
+ {
341
+ forget ( address );
342
+ }
343
+ }
344
+
345
+ throw new ConnectionFailureException ( "Failed to connect to any servers" );
321
346
}
322
347
323
348
@ Override
0 commit comments