Skip to content

Remove explicit fairness from DefaultConnectionPool #1575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.mongodb.internal.function.CheckedSupplier;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Supplier;

Expand Down Expand Up @@ -94,54 +93,6 @@ public static void lockInterruptibly(final Lock lock) throws MongoInterruptedExc
}
}

/**
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
*/
public static void withUnfairLock(final ReentrantLock lock, final Runnable action) {
withUnfairLock(lock, () -> {
action.run();
return null;
});
}

/**
* See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method.
*/
public static <V> V withUnfairLock(final ReentrantLock lock, final Supplier<V> supplier) {
lockUnfair(lock);
try {
return supplier.get();
} finally {
lock.unlock();
}
}

private static void lockUnfair(
// The type must be `ReentrantLock`, not `Lock`,
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
final ReentrantLock lock) {
if (!lock.tryLock()) {
lock.lock();
}
}

/**
* This method allows a thread to attempt acquiring the {@code lock} unfairly despite the {@code lock}
* being {@linkplain ReentrantLock#ReentrantLock(boolean) fair}. In most cases you should create an unfair lock,
* instead of using this method.
*/
public static void lockInterruptiblyUnfair(
// The type must be `ReentrantLock`, not `Lock`,
// because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior.
final ReentrantLock lock) throws MongoInterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw interruptAndCreateMongoInterruptedException(null, null);
}
if (!lock.tryLock()) {
lockInterruptibly(lock);
}
}

private Locks() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.lockInterruptiblyUnfair;
import static com.mongodb.internal.Locks.withUnfairLock;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

Expand Down Expand Up @@ -171,9 +168,9 @@ public T get(final long timeout, final TimeUnit timeUnit) {
* and returns {@code null} instead of throwing {@link MongoTimeoutException}.
*/
@Nullable
T getImmediateUnfair() {
T getImmediate() {
T element = null;
if (stateAndPermits.acquirePermitImmediateUnfair()) {
if (stateAndPermits.acquirePermitImmediate()) {
element = available.pollLast();
if (element == null) {
stateAndPermits.releasePermit();
Expand Down Expand Up @@ -321,57 +318,26 @@ private static final class StateAndPermits {
private volatile boolean closed;
private final int maxPermits;
private volatile int permits;
/** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on
* {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly,
* to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire
* the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies
* but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest
* latencies with unfair locking. The fair approach is in accordance with the
* <a href="https://github.com/mongodb/specifications/blob/568093ce7f0e1394cf4952c417e1e7dacc5fef53/source/connection-monitoring-and-pooling/connection-monitoring-and-pooling.rst#waitqueue">
* connection pool specification</a>.
* <p>
* When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock,
* and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition},
* threads spend less time in the queue. This results in having smaller high percentiles
* of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput
* to further reduce the high percentiles by acquiring the lock fairly.</p>
* <p>
* While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in
* comments in <a href="https://jira.mongodb.org/browse/JAVA-4452">JAVA-4452</a>.</p>
* <p>
* {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it
* to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead.
* This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa.
* But it appears to be a good enough compromise, that results in having enough throughput when there are enough
* available permits and tolerable high percentiles of latencies when there are not enough available permits.</p>
* <p>
* It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters,
* but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies
* (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no
* fair locking is used. That is, this approach does not result in the behavior we want.</p>
*/
private final AtomicInteger waitersEstimate;
@Nullable
private Supplier<MongoException> causeSupplier;

StateAndPermits(final int maxPermits, final Supplier<MongoServerUnavailableException> poolClosedExceptionSupplier) {
this.poolClosedExceptionSupplier = poolClosedExceptionSupplier;
lock = new ReentrantLock(true);
lock = new ReentrantLock();
permitAvailableOrClosedOrPausedCondition = lock.newCondition();
paused = false;
closed = false;
this.maxPermits = maxPermits;
permits = maxPermits;
waitersEstimate = new AtomicInteger();
causeSupplier = null;
}

int permits() {
return permits;
}

boolean acquirePermitImmediateUnfair() {
return withUnfairLock(lock, () -> {
boolean acquirePermitImmediate() {
return withLock(lock, () -> {
throwIfClosedOrPaused();
if (permits > 0) {
//noinspection NonAtomicOperationOnVolatileField
Expand All @@ -391,17 +357,12 @@ boolean acquirePermitImmediateUnfair() {
*/
boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInterruptedException {
long remainingNanos = unit.toNanos(timeout);
if (waitersEstimate.get() == 0) {
lockInterruptiblyUnfair(lock);
} else {
lockInterruptibly(lock);
}
lockInterruptibly(lock);
try {
while (permits == 0
// the absence of short-circuiting is of importance
& !throwIfClosedOrPaused()) {
try {
waitersEstimate.incrementAndGet();
if (timeout < 0 || remainingNanos == Long.MAX_VALUE) {
permitAvailableOrClosedOrPausedCondition.await();
} else if (remainingNanos >= 0) {
Expand All @@ -411,8 +372,6 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
}
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException(null, e);
} finally {
waitersEstimate.decrementAndGet();
}
}
assertTrue(permits > 0);
Expand All @@ -425,7 +384,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
}

void releasePermit() {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
assertTrue(permits < maxPermits);
//noinspection NonAtomicOperationOnVolatileField
permits++;
Expand All @@ -434,7 +393,7 @@ void releasePermit() {
}

void pause(final Supplier<MongoException> causeSupplier) {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
if (!paused) {
this.paused = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
Expand All @@ -445,7 +404,7 @@ void pause(final Supplier<MongoException> causeSupplier) {

void ready() {
if (paused) {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
this.paused = false;
this.causeSupplier = null;
});
Expand All @@ -457,7 +416,7 @@ void ready() {
*/
boolean close() {
if (!closed) {
return withUnfairLock(lock, () -> {
return withLock(lock, () -> {
if (!closed) {
closed = true;
permitAvailableOrClosedOrPausedCondition.signalAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR;
import static com.mongodb.internal.Locks.lockInterruptibly;
import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.Locks.withUnfairLock;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE;
Expand Down Expand Up @@ -351,11 +350,11 @@ private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, fin
}

@Nullable
private PooledConnection getPooledConnectionImmediateUnfair() {
UsageTrackingInternalConnection internalConnection = pool.getImmediateUnfair();
private PooledConnection getPooledConnectionImmediate() {
UsageTrackingInternalConnection internalConnection = pool.getImmediate();
while (internalConnection != null && shouldPrune(internalConnection)) {
pool.release(internalConnection, true);
internalConnection = pool.getImmediateUnfair();
internalConnection = pool.getImmediate();
}
return internalConnection == null ? null : new PooledConnection(internalConnection);
}
Expand Down Expand Up @@ -896,7 +895,7 @@ private final class OpenConcurrencyLimiter {
private final Deque<MutableReference<PooledConnection>> desiredConnectionSlots;

OpenConcurrencyLimiter(final int maxConnecting) {
lock = new ReentrantLock(true);
lock = new ReentrantLock(false);
permitAvailableOrHandedOverOrClosedOrPausedCondition = lock.newCondition();
maxPermits = maxConnecting;
permits = maxPermits;
Expand Down Expand Up @@ -1054,11 +1053,8 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
* 2. Thread#2 checks in a connection. Tries to hand it over, but there are no threads desiring to get one.
* 3. Thread#1 executes the current code. Expresses the desire to get a connection via the hand-over mechanism,
* but thread#2 has already tried handing over and released its connection to the pool.
* As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool.
*
* This attempt should be unfair because the current thread (Thread#1) has already waited for its turn fairly.
* Waiting fairly again puts the current thread behind other threads, which is unfair to the current thread. */
availableConnection = getPooledConnectionImmediateUnfair();
* As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool. */
availableConnection = getPooledConnectionImmediate();
if (availableConnection != null) {
return availableConnection;
}
Expand Down Expand Up @@ -1093,7 +1089,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
}

private void releasePermit() {
withUnfairLock(lock, () -> {
withLock(lock, () -> {
assertTrue(permits < maxPermits);
permits++;
permitAvailableOrHandedOverOrClosedOrPausedCondition.signal();
Expand Down Expand Up @@ -1128,7 +1124,7 @@ private void giveUpOnTryingToGetAvailableConnection() {
* from threads that are waiting for a permit to open a connection.
*/
void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) {
boolean handedOver = withUnfairLock(lock, () -> {
boolean handedOver = withLock(lock, () -> {
for (//iterate from first (head) to last (tail)
MutableReference<PooledConnection> desiredConnectionSlot : desiredConnectionSlots) {
if (desiredConnectionSlot.reference == null) {
Expand All @@ -1145,7 +1141,7 @@ void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection)
}

void signalClosedOrPaused() {
withUnfairLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
withLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll);
}
}

Expand Down