17
17
package org .neo4j .driver .internal .bolt .routedimpl ;
18
18
19
19
import static java .lang .String .format ;
20
- import static org .neo4j .driver .internal .bolt .routedimpl .util .LockUtil .executeWithLock ;
21
20
22
21
import java .time .Clock ;
23
22
import java .util .ArrayList ;
30
29
import java .util .concurrent .CompletableFuture ;
31
30
import java .util .concurrent .CompletionStage ;
32
31
import java .util .concurrent .atomic .AtomicReference ;
33
- import java .util .concurrent .locks .ReentrantLock ;
34
32
import java .util .function .Consumer ;
35
33
import java .util .function .Function ;
36
34
import java .util .function .Supplier ;
@@ -71,7 +69,6 @@ public class RoutedBoltConnectionProvider implements BoltConnectionProvider {
71
69
"Failed to obtain a connection towards address %s, will try other addresses if available. Complete failure is reported separately from this entry." ;
72
70
private final LoggingProvider logging ;
73
71
private final System .Logger log ;
74
- private final ReentrantLock lock = new ReentrantLock ();
75
72
private final Supplier <BoltConnectionProvider > boltConnectionProviderSupplier ;
76
73
77
74
private final Map <BoltServerAddress , BoltConnectionProvider > addressToProvider = new HashMap <>();
@@ -85,8 +82,6 @@ public class RoutedBoltConnectionProvider implements BoltConnectionProvider {
85
82
private Rediscovery rediscovery ;
86
83
private RoutingTableRegistry registry ;
87
84
88
- private BoltServerAddress address ;
89
-
90
85
private RoutingContext routingContext ;
91
86
private BoltAgent boltAgent ;
92
87
private String userAgent ;
@@ -107,28 +102,21 @@ public RoutedBoltConnectionProvider(
107
102
this .resolver = Objects .requireNonNull (resolver );
108
103
this .logging = Objects .requireNonNull (logging );
109
104
this .log = logging .getLog (getClass ());
110
- this .loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy (
111
- (addr ) -> {
112
- synchronized (this ) {
113
- return addressToInUseCount .getOrDefault (address , 0 );
114
- }
115
- },
116
- logging );
105
+ this .loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy (this ::getInUseCount , logging );
117
106
this .domainNameResolver = Objects .requireNonNull (domainNameResolver );
118
107
this .routingTablePurgeDelayMs = routingTablePurgeDelayMs ;
119
108
this .rediscovery = rediscovery ;
120
109
this .clock = Objects .requireNonNull (clock );
121
110
}
122
111
123
112
@ Override
124
- public CompletionStage <Void > init (
113
+ public synchronized CompletionStage <Void > init (
125
114
BoltServerAddress address ,
126
115
RoutingContext routingContext ,
127
116
BoltAgent boltAgent ,
128
117
String userAgent ,
129
118
int connectTimeoutMillis ,
130
119
MetricsListener metricsListener ) {
131
- this .address = address ;
132
120
this .routingContext = routingContext ;
133
121
this .boltAgent = boltAgent ;
134
122
this .userAgent = userAgent ;
@@ -154,10 +142,12 @@ public CompletionStage<BoltConnection> connect(
154
142
BoltProtocolVersion minVersion ,
155
143
NotificationConfig notificationConfig ,
156
144
Consumer <DatabaseName > databaseNameConsumer ) {
145
+ RoutingTableRegistry registry ;
157
146
synchronized (this ) {
158
147
if (closeFuture != null ) {
159
148
return CompletableFuture .failedFuture (new IllegalStateException ("Connection provider is closed." ));
160
149
}
150
+ registry = this .registry ;
161
151
}
162
152
163
153
var handlerRef = new AtomicReference <RoutingTableHandler >();
@@ -196,6 +186,10 @@ public CompletionStage<BoltConnection> connect(
196
186
197
187
@ Override
198
188
public CompletionStage <Void > verifyConnectivity (SecurityPlan securityPlan , Map <String , Value > authMap ) {
189
+ RoutingTableRegistry registry ;
190
+ synchronized (this ) {
191
+ registry = this .registry ;
192
+ }
199
193
return supportsMultiDb (securityPlan , authMap )
200
194
.thenCompose (supports -> registry .ensureRoutingTable (
201
195
securityPlan ,
@@ -244,7 +238,7 @@ private synchronized void shutdownUnusedProviders(Set<BoltServerAddress> address
244
238
while (iterator .hasNext ()) {
245
239
var entry = iterator .next ();
246
240
var address = entry .getKey ();
247
- if (!addressesToRetain .contains (address ) && addressToInUseCount . getOrDefault (address , 0 ) == 0 ) {
241
+ if (!addressesToRetain .contains (address ) && getInUseCount (address ) == 0 ) {
248
242
entry .getValue ().close ();
249
243
iterator .remove ();
250
244
}
@@ -256,8 +250,12 @@ private CompletionStage<Boolean> detectFeature(
256
250
Map <String , Value > authMap ,
257
251
String baseErrorMessagePrefix ,
258
252
Function <BoltConnection , Boolean > featureDetectionFunction ) {
259
- List <BoltServerAddress > addresses ;
253
+ Rediscovery rediscovery ;
254
+ synchronized (this ) {
255
+ rediscovery = this .rediscovery ;
256
+ }
260
257
258
+ List <BoltServerAddress > addresses ;
261
259
try {
262
260
addresses = rediscovery .resolve ();
263
261
} catch (Throwable error ) {
@@ -390,11 +388,7 @@ private void acquire(
390
388
result .completeExceptionally (error );
391
389
}
392
390
} else {
393
- synchronized (this ) {
394
- var inUse = addressToInUseCount .getOrDefault (address , 0 );
395
- inUse ++;
396
- addressToInUseCount .put (address , inUse );
397
- }
391
+ addInUseCount (address );
398
392
result .complete (connection );
399
393
}
400
394
});
@@ -414,43 +408,51 @@ private static List<BoltServerAddress> getAddressesByMode(AccessMode mode, Routi
414
408
};
415
409
}
416
410
417
- synchronized void decreaseCount (BoltServerAddress address ) {
418
- var inUse = addressToInUseCount .get (address );
419
- if (inUse != null ) {
420
- inUse --;
421
- if (inUse <= 0 ) {
422
- addressToInUseCount .remove (address );
411
+ private synchronized int getInUseCount (BoltServerAddress address ) {
412
+ return addressToInUseCount .getOrDefault (address , 0 );
413
+ }
414
+
415
+ private synchronized void addInUseCount (BoltServerAddress address ) {
416
+ addressToInUseCount .merge (address , 1 , Integer ::sum );
417
+ }
418
+
419
+ synchronized void removeInUseCount (BoltServerAddress address ) {
420
+ addressToInUseCount .compute (address , (ignored , value ) -> {
421
+ if (value == null ) {
422
+ return null ;
423
423
} else {
424
- addressToInUseCount .put (address , inUse );
424
+ value --;
425
+ return value > 0 ? value : null ;
425
426
}
426
- }
427
+ });
427
428
}
428
-
429
429
@ Override
430
430
public CompletionStage <Void > close () {
431
431
CompletableFuture <Void > closeFuture ;
432
432
synchronized (this ) {
433
433
if (this .closeFuture == null ) {
434
- var futures = executeWithLock (lock , () -> addressToProvider .values ().stream ()
435
- .map (BoltConnectionProvider ::close )
436
- .map (CompletionStage ::toCompletableFuture )
437
- .toArray (CompletableFuture []::new ));
434
+ @ SuppressWarnings ({"rawtypes" , "RedundantSuppression" })
435
+ var futures = new CompletableFuture [addressToProvider .size ()];
436
+ var iterator = addressToProvider .values ().iterator ();
437
+ var index = 0 ;
438
+ while (iterator .hasNext ()) {
439
+ futures [index ++] = iterator .next ().close ().toCompletableFuture ();
440
+ iterator .remove ();
441
+ }
438
442
this .closeFuture = CompletableFuture .allOf (futures );
439
443
}
440
444
closeFuture = this .closeFuture ;
441
445
}
442
446
return closeFuture ;
443
447
}
444
448
445
- private BoltConnectionProvider get (BoltServerAddress address ) {
446
- return executeWithLock (lock , () -> {
447
- var provider = addressToProvider .get (address );
448
- if (provider == null ) {
449
- provider = boltConnectionProviderSupplier .get ();
450
- provider .init (address , routingContext , boltAgent , userAgent , connectTimeoutMillis , metricsListener );
451
- addressToProvider .put (address , provider );
452
- }
453
- return provider ;
454
- });
449
+ private synchronized BoltConnectionProvider get (BoltServerAddress address ) {
450
+ var provider = addressToProvider .get (address );
451
+ if (provider == null ) {
452
+ provider = boltConnectionProviderSupplier .get ();
453
+ provider .init (address , routingContext , boltAgent , userAgent , connectTimeoutMillis , metricsListener );
454
+ addressToProvider .put (address , provider );
455
+ }
456
+ return provider ;
455
457
}
456
458
}
0 commit comments