Skip to content

Commit 7930b51

Browse files
committed
Remove explicit fairness from DefaultConnectionPool
JAVA-5680
1 parent 241ef1c commit 7930b51

File tree

4 files changed

+13
-110
lines changed

4 files changed

+13
-110
lines changed

driver-core/src/main/com/mongodb/internal/Locks.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.mongodb.internal.function.CheckedSupplier;
2121

2222
import java.util.concurrent.locks.Lock;
23-
import java.util.concurrent.locks.ReentrantLock;
2423
import java.util.concurrent.locks.StampedLock;
2524
import java.util.function.Supplier;
2625

@@ -94,54 +93,6 @@ public static void lockInterruptibly(final Lock lock) throws MongoInterruptedExc
9493
}
9594
}
9695

97-
/**
98-
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
99-
*/
100-
public static void withUnfairLock(final ReentrantLock lock, final Runnable action) {
101-
withUnfairLock(lock, () -> {
102-
action.run();
103-
return null;
104-
});
105-
}
106-
107-
/**
108-
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
109-
*/
110-
public static <V> V withUnfairLock(final ReentrantLock lock, final Supplier<V> supplier) {
111-
lockUnfair(lock);
112-
try {
113-
return supplier.get();
114-
} finally {
115-
lock.unlock();
116-
}
117-
}
118-
119-
private static void lockUnfair(
120-
// The type must be `ReentrantLock`, not `Lock`,
121-
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
122-
final ReentrantLock lock) {
123-
if (!lock.tryLock()) {
124-
lock.lock();
125-
}
126-
}
127-
128-
/**
129-
* This method allows a thread to attempt acquiring the {@code lock} unfairly despite the {@code lock}
130-
* being {@linkplain ReentrantLock#ReentrantLock(boolean) fair}. In most cases you should create an unfair lock,
131-
* instead of using this method.
132-
*/
133-
public static void lockInterruptiblyUnfair(
134-
// The type must be `ReentrantLock`, not `Lock`,
135-
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
136-
final ReentrantLock lock) throws MongoInterruptedException {
137-
if (Thread.currentThread().isInterrupted()) {
138-
throw interruptAndCreateMongoInterruptedException(null, null);
139-
}
140-
if (!lock.tryLock()) {
141-
lockInterruptibly(lock);
142-
}
143-
}
144-
14596
private Locks() {
14697
}
14798
}

driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,16 @@
3030
import java.util.Iterator;
3131
import java.util.concurrent.ConcurrentLinkedDeque;
3232
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.atomic.AtomicInteger;
3433
import java.util.concurrent.locks.Condition;
3534
import java.util.concurrent.locks.ReentrantLock;
36-
import java.util.concurrent.locks.ReentrantReadWriteLock;
3735
import java.util.function.Consumer;
3836
import java.util.function.Supplier;
3937

4038
import static com.mongodb.assertions.Assertions.assertNotNull;
4139
import static com.mongodb.assertions.Assertions.assertTrue;
4240
import static com.mongodb.assertions.Assertions.notNull;
4341
import static com.mongodb.internal.Locks.lockInterruptibly;
44-
import static com.mongodb.internal.Locks.lockInterruptiblyUnfair;
45-
import static com.mongodb.internal.Locks.withUnfairLock;
42+
import static com.mongodb.internal.Locks.withLock;
4643
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
4744
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
4845

@@ -321,48 +318,17 @@ private static final class StateAndPermits {
321318
private volatile boolean closed;
322319
private final int maxPermits;
323320
private volatile int permits;
324-
/** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on
325-
* {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly,
326-
* to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire
327-
* the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies
328-
* but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest
329-
* latencies with unfair locking. The fair approach is in accordance with the
330-
* <a href="https://github.com/mongodb/specifications/blob/568093ce7f0e1394cf4952c417e1e7dacc5fef53/source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.rst#waitqueue">
331-
* connection pool specification</a>.
332-
* <p>
333-
* When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock,
334-
* and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition},
335-
* threads spend less time in the queue. This results in having smaller high percentiles
336-
* of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput
337-
* to further reduce the high percentiles by acquiring the lock fairly.</p>
338-
* <p>
339-
* While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in
340-
* comments in <a href="https://jira.mongodb.org/browse/JAVA-4452">JAVA-4452</a>.</p>
341-
* <p>
342-
* {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it
343-
* to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead.
344-
* This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa.
345-
* But it appears to be a good enough compromise, that results in having enough throughput when there are enough
346-
* available permits and tolerable high percentiles of latencies when there are not enough available permits.</p>
347-
* <p>
348-
* It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters,
349-
* but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies
350-
* (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no
351-
* fair locking is used. That is, this approach does not result in the behavior we want.</p>
352-
*/
353-
private final AtomicInteger waitersEstimate;
354321
@Nullable
355322
private Supplier<MongoException> causeSupplier;
356323

357324
StateAndPermits(final int maxPermits, final Supplier<MongoServerUnavailableException> poolClosedExceptionSupplier) {
358325
this.poolClosedExceptionSupplier = poolClosedExceptionSupplier;
359-
lock = new ReentrantLock(true);
326+
lock = new ReentrantLock();
360327
permitAvailableOrClosedOrPausedCondition = lock.newCondition();
361328
paused = false;
362329
closed = false;
363330
this.maxPermits = maxPermits;
364331
permits = maxPermits;
365-
waitersEstimate = new AtomicInteger();
366332
causeSupplier = null;
367333
}
368334

@@ -371,7 +337,7 @@ int permits() {
371337
}
372338

373339
boolean acquirePermitImmediateUnfair() {
374-
return withUnfairLock(lock, () -> {
340+
return withLock(lock, () -> {
375341
throwIfClosedOrPaused();
376342
if (permits > 0) {
377343
//noinspection NonAtomicOperationOnVolatileField
@@ -391,17 +357,12 @@ boolean acquirePermitImmediateUnfair() {
391357
*/
392358
boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInterruptedException {
393359
long remainingNanos = unit.toNanos(timeout);
394-
if (waitersEstimate.get() == 0) {
395-
lockInterruptiblyUnfair(lock);
396-
} else {
397-
lockInterruptibly(lock);
398-
}
360+
lockInterruptibly(lock);
399361
try {
400362
while (permits == 0
401363
// the absence of short-circuiting is of importance
402364
& !throwIfClosedOrPaused()) {
403365
try {
404-
waitersEstimate.incrementAndGet();
405366
if (timeout < 0 || remainingNanos == Long.MAX_VALUE) {
406367
permitAvailableOrClosedOrPausedCondition.await();
407368
} else if (remainingNanos >= 0) {
@@ -411,8 +372,6 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
411372
}
412373
} catch (InterruptedException e) {
413374
throw interruptAndCreateMongoInterruptedException(null, e);
414-
} finally {
415-
waitersEstimate.decrementAndGet();
416375
}
417376
}
418377
assertTrue(permits > 0);
@@ -425,7 +384,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
425384
}
426385

427386
void releasePermit() {
428-
withUnfairLock(lock, () -> {
387+
withLock(lock, () -> {
429388
assertTrue(permits < maxPermits);
430389
//noinspection NonAtomicOperationOnVolatileField
431390
permits++;
@@ -434,7 +393,7 @@ void releasePermit() {
434393
}
435394

436395
void pause(final Supplier<MongoException> causeSupplier) {
437-
withUnfairLock(lock, () -> {
396+
withLock(lock, () -> {
438397
if (!paused) {
439398
this.paused = true;
440399
permitAvailableOrClosedOrPausedCondition.signalAll();
@@ -445,7 +404,7 @@ void pause(final Supplier<MongoException> causeSupplier) {
445404

446405
void ready() {
447406
if (paused) {
448-
withUnfairLock(lock, () -> {
407+
withLock(lock, () -> {
449408
this.paused = false;
450409
this.causeSupplier = null;
451410
});
@@ -457,7 +416,7 @@ void ready() {
457416
*/
458417
boolean close() {
459418
if (!closed) {
460-
return withUnfairLock(lock, () -> {
419+
return withLock(lock, () -> {
461420
if (!closed) {
462421
closed = true;
463422
permitAvailableOrClosedOrPausedCondition.signalAll();

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@
9898
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
9999
import static com.mongodb.internal.Locks.lockInterruptibly;
100100
import static com.mongodb.internal.Locks.withLock;
101-
import static com.mongodb.internal.Locks.withUnfairLock;
102101
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
103102
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
104103
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
@@ -896,7 +895,7 @@ private final class OpenConcurrencyLimiter {
896895
private final Deque<MutableReference<PooledConnection>> desiredConnectionSlots;
897896

898897
OpenConcurrencyLimiter(final int maxConnecting) {
899-
lock = new ReentrantLock(true);
898+
lock = new ReentrantLock(false);
900899
permitAvailableOrHandedOverOrClosedOrPausedCondition = lock.newCondition();
901900
maxPermits = maxConnecting;
902901
permits = maxPermits;
@@ -1054,10 +1053,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
10541053
* 2. Thread#2 checks in a connection. Tries to hand it over, but there are no threads desiring to get one.
10551054
* 3. Thread#1 executes the current code. Expresses the desire to get a connection via the hand-over mechanism,
10561055
* but thread#2 has already tried handing over and released its connection to the pool.
1057-
* As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool.
1058-
*
1059-
* This attempt should be unfair because the current thread (Thread#1) has already waited for its turn fairly.
1060-
* Waiting fairly again puts the current thread behind other threads, which is unfair to the current thread. */
1056+
* As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool. */
10611057
availableConnection = getPooledConnectionImmediateUnfair();
10621058
if (availableConnection != null) {
10631059
return availableConnection;
@@ -1093,7 +1089,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
10931089
}
10941090

10951091
private void releasePermit() {
1096-
withUnfairLock(lock, () -> {
1092+
withLock(lock, () -> {
10971093
assertTrue(permits < maxPermits);
10981094
permits++;
10991095
permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
@@ -1128,7 +1124,7 @@ private void giveUpOnTryingToGetAvailableConnection() {
11281124
* from threads that are waiting for a permit to open a connection.
11291125
*/
11301126
void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) {
1131-
boolean handedOver = withUnfairLock(lock, () -> {
1127+
boolean handedOver = withLock(lock, () -> {
11321128
for (//iterate from first (head) to last (tail)
11331129
MutableReference<PooledConnection> desiredConnectionSlot : desiredConnectionSlots) {
11341130
if (desiredConnectionSlot.reference == null) {
@@ -1145,7 +1141,7 @@ void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection)
11451141
}
11461142

11471143
void signalClosedOrPaused() {
1148-
withUnfairLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
1144+
withLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
11491145
}
11501146
}
11511147

driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,6 @@ void infiniteMaxSize() {
308308

309309
@ParameterizedTest
310310
@MethodSource("concurrentUsageArguments")
311-
@Tag("Slow")
312311
public void concurrentUsage(final int minSize, final int maxSize, final boolean limitConnectionLifeIdleTime,
313312
final int concurrentUsersCount,
314313
final boolean checkoutSync, final boolean checkoutAsync,
@@ -348,7 +347,6 @@ private static Stream<Arguments> concurrentUsageArguments() {
348347
}
349348

350349
@Test
351-
@Tag("Slow")
352350
public void callbackShouldNotBlockCheckoutIfOpenAsyncWorksNotInCurrentThread() throws InterruptedException, TimeoutException {
353351
int maxAvailableConnections = 7;
354352
ControllableConnectionFactory controllableConnFactory = newControllableConnectionFactory(cachedExecutor);
@@ -382,7 +380,6 @@ public void callbackShouldNotBlockCheckoutIfOpenAsyncWorksNotInCurrentThread() t
382380
* since there are no permits to open available, the hand-over mechanism is the only way to get a connection.
383381
*/
384382
@Test
385-
@Tag("Slow")
386383
public void checkoutHandOverMechanism() throws InterruptedException, TimeoutException {
387384
int openConnectionsCount = 5_000;
388385
int maxConcurrentlyHandedOver = 7;

0 commit comments

Comments
 (0)