@@ -114,52 +114,10 @@ public PooledConnection acquireConnection( AccessMode mode )
114
114
public CompletionStage <AsyncConnection > acquireAsyncConnection ( AccessMode mode )
115
115
{
116
116
return freshRoutingTable ( mode )
117
- .thenCompose ( routingTable -> acquire ( mode , routingTable ) )
117
+ .thenCompose ( routingTable -> acquireAsync ( mode , routingTable ) )
118
118
.thenApply ( connection -> new RoutingAsyncConnection ( connection , mode , this ) );
119
119
}
120
120
121
- private CompletionStage <AsyncConnection > acquire ( AccessMode mode , RoutingTable routingTable )
122
- {
123
- AddressSet addresses = addressSet ( mode , routingTable );
124
- CompletableFuture <AsyncConnection > result = new CompletableFuture <>();
125
- acquire ( mode , addresses , result );
126
- return result ;
127
- }
128
-
129
- private void acquire ( AccessMode mode , AddressSet addresses , CompletableFuture <AsyncConnection > result )
130
- {
131
- BoltServerAddress address = selectAddressAsync ( mode , addresses );
132
-
133
- if ( address == null )
134
- {
135
- result .completeExceptionally ( new SessionExpiredException (
136
- "Failed to obtain connection towards " + mode + " server. " +
137
- "Known routing table is: " + routingTable ) );
138
- return ;
139
- }
140
-
141
- asyncConnectionPool .acquire ( address ).whenComplete ( ( connection , error ) ->
142
- {
143
- if ( error != null )
144
- {
145
- if ( error instanceof ServiceUnavailableException )
146
- {
147
- log .error ( "Failed to obtain a connection towards address " + address , error );
148
- forget ( address );
149
- eventExecutorGroup .next ().execute ( () -> acquire ( mode , addresses , result ) );
150
- }
151
- else
152
- {
153
- result .completeExceptionally ( error );
154
- }
155
- }
156
- else
157
- {
158
- result .complete ( connection );
159
- }
160
- } );
161
- }
162
-
163
121
@ Override
164
122
public void onConnectionFailure ( BoltServerAddress address )
165
123
{
@@ -294,6 +252,48 @@ private synchronized void clusterCompositionLookupFailed( Throwable error )
294
252
routingTableFuture .completeExceptionally ( error );
295
253
}
296
254
255
+ private CompletionStage <AsyncConnection > acquireAsync ( AccessMode mode , RoutingTable routingTable )
256
+ {
257
+ AddressSet addresses = addressSet ( mode , routingTable );
258
+ CompletableFuture <AsyncConnection > result = new CompletableFuture <>();
259
+ acquireAsync ( mode , addresses , result );
260
+ return result ;
261
+ }
262
+
263
+ private void acquireAsync ( AccessMode mode , AddressSet addresses , CompletableFuture <AsyncConnection > result )
264
+ {
265
+ BoltServerAddress address = selectAddressAsync ( mode , addresses );
266
+
267
+ if ( address == null )
268
+ {
269
+ result .completeExceptionally ( new SessionExpiredException (
270
+ "Failed to obtain connection towards " + mode + " server. " +
271
+ "Known routing table is: " + routingTable ) );
272
+ return ;
273
+ }
274
+
275
+ asyncConnectionPool .acquire ( address ).whenComplete ( ( connection , error ) ->
276
+ {
277
+ if ( error != null )
278
+ {
279
+ if ( error instanceof ServiceUnavailableException )
280
+ {
281
+ log .error ( "Failed to obtain a connection towards address " + address , error );
282
+ forget ( address );
283
+ eventExecutorGroup .next ().execute ( () -> acquireAsync ( mode , addresses , result ) );
284
+ }
285
+ else
286
+ {
287
+ result .completeExceptionally ( error );
288
+ }
289
+ }
290
+ else
291
+ {
292
+ result .complete ( connection );
293
+ }
294
+ } );
295
+ }
296
+
297
297
private static AddressSet addressSet ( AccessMode mode , RoutingTable routingTable )
298
298
{
299
299
switch ( mode )
0 commit comments