20
20
21
21
import io .netty .util .concurrent .EventExecutorGroup ;
22
22
23
+ import java .util .LinkedList ;
23
24
import java .util .List ;
24
25
import java .util .concurrent .CompletableFuture ;
25
26
import java .util .concurrent .CompletionStage ;
62
63
public class LoadBalancer implements ConnectionProvider
63
64
{
64
65
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer" ;
66
+ private static final String CONNECTION_ACQUISITION_COMPLETION_FAILURE_MESSAGE = "Connection acquisition failed for all available addresses." ;
67
+ private static final String CONNECTION_ACQUISITION_COMPLETION_EXCEPTION_MESSAGE =
68
+ "Failed to obtain connection towards %s server. Known routing table is: %s" ;
69
+ private static final String CONNECTION_ACQUISITION_ATTEMPT_FAILURE_MESSAGE =
70
+ "Failed to obtain a connection towards address %s, will try other addresses if available. Complete failure is reported separately from this entry." ;
71
+ private static final String CONNECTION_ACQUISITION_ATTEMPT_EXCEPTION_MESSAGE = "Server at %s is no longer available" ;
65
72
private final ConnectionPool connectionPool ;
66
73
private final RoutingTableRegistry routingTables ;
67
74
private final LoadBalancingStrategy loadBalancingStrategy ;
@@ -181,19 +188,23 @@ private CompletionStage<Connection> acquire( AccessMode mode, RoutingTable routi
181
188
{
182
189
AddressSet addresses = addressSet ( mode , routingTable );
183
190
CompletableFuture <Connection > result = new CompletableFuture <>();
184
- acquire ( mode , routingTable , addresses , result );
191
+ List <Throwable > attemptExceptions = new LinkedList <>();
192
+ acquire ( mode , routingTable , addresses , result , attemptExceptions );
185
193
return result ;
186
194
}
187
195
188
- private void acquire ( AccessMode mode , RoutingTable routingTable , AddressSet addresses , CompletableFuture <Connection > result )
196
+ private void acquire ( AccessMode mode , RoutingTable routingTable , AddressSet addresses , CompletableFuture <Connection > result ,
197
+ List <Throwable > attemptExceptions )
189
198
{
190
199
BoltServerAddress address = selectAddress ( mode , addresses );
191
200
192
201
if ( address == null )
193
202
{
194
- result .completeExceptionally ( new SessionExpiredException (
195
- "Failed to obtain connection towards " + mode + " server. " +
196
- "Known routing table is: " + routingTable ) );
203
+ SessionExpiredException completionError =
204
+ new SessionExpiredException ( format ( CONNECTION_ACQUISITION_COMPLETION_EXCEPTION_MESSAGE , mode , routingTable ) );
205
+ attemptExceptions .forEach ( completionError ::addSuppressed );
206
+ log .error ( CONNECTION_ACQUISITION_COMPLETION_FAILURE_MESSAGE , completionError );
207
+ result .completeExceptionally ( completionError );
197
208
return ;
198
209
}
199
210
@@ -204,10 +215,14 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add
204
215
{
205
216
if ( error instanceof ServiceUnavailableException )
206
217
{
207
- SessionExpiredException errorToLog = new SessionExpiredException ( format ( "Server at %s is no longer available" , address ), error );
208
- log .warn ( "Failed to obtain a connection towards address " + address , errorToLog );
218
+ String attemptMessage = format ( CONNECTION_ACQUISITION_ATTEMPT_FAILURE_MESSAGE , address );
219
+ log .warn ( attemptMessage );
220
+ SessionExpiredException attemptException =
221
+ new SessionExpiredException ( format ( CONNECTION_ACQUISITION_ATTEMPT_EXCEPTION_MESSAGE , address ), error );
222
+ attemptExceptions .add ( attemptException );
223
+ log .debug ( attemptMessage , attemptException );
209
224
routingTable .forget ( address );
210
- eventExecutorGroup .next ().execute ( () -> acquire ( mode , routingTable , addresses , result ) );
225
+ eventExecutorGroup .next ().execute ( () -> acquire ( mode , routingTable , addresses , result , attemptExceptions ) );
211
226
}
212
227
else
213
228
{
0 commit comments