diff --git a/driver-core/src/main/com/mongodb/internal/Locks.java b/driver-core/src/main/com/mongodb/internal/Locks.java index 8e8260f50d3..042fc9fd69f 100644 --- a/driver-core/src/main/com/mongodb/internal/Locks.java +++ b/driver-core/src/main/com/mongodb/internal/Locks.java @@ -20,7 +20,6 @@ import com.mongodb.internal.function.CheckedSupplier; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.StampedLock; import java.util.function.Supplier; @@ -94,54 +93,6 @@ public static void lockInterruptibly(final Lock lock) throws MongoInterruptedExc } } - /** - * See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method. - */ - public static void withUnfairLock(final ReentrantLock lock, final Runnable action) { - withUnfairLock(lock, () -> { - action.run(); - return null; - }); - } - - /** - * See {@link #lockInterruptiblyUnfair(ReentrantLock)} before using this method. - */ - public static V withUnfairLock(final ReentrantLock lock, final Supplier supplier) { - lockUnfair(lock); - try { - return supplier.get(); - } finally { - lock.unlock(); - } - } - - private static void lockUnfair( - // The type must be `ReentrantLock`, not `Lock`, - // because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior. - final ReentrantLock lock) { - if (!lock.tryLock()) { - lock.lock(); - } - } - - /** - * This method allows a thread to attempt acquiring the {@code lock} unfairly despite the {@code lock} - * being {@linkplain ReentrantLock#ReentrantLock(boolean) fair}. In most cases you should create an unfair lock, - * instead of using this method. - */ - public static void lockInterruptiblyUnfair( - // The type must be `ReentrantLock`, not `Lock`, - // because only `ReentrantLock.tryLock` is documented to have the barging (unfair) behavior. - final ReentrantLock lock) throws MongoInterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw interruptAndCreateMongoInterruptedException(null, null); - } - if (!lock.tryLock()) { - lockInterruptibly(lock); - } - } - private Locks() { } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java b/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java index fe3ac129631..26ba9c4f34d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java @@ -30,10 +30,8 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Supplier; @@ -41,8 +39,7 @@ import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.Locks.lockInterruptibly; -import static com.mongodb.internal.Locks.lockInterruptiblyUnfair; -import static com.mongodb.internal.Locks.withUnfairLock; +import static com.mongodb.internal.Locks.withLock; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; @@ -171,9 +168,9 @@ public T get(final long timeout, final TimeUnit timeUnit) { * and returns {@code null} instead of throwing {@link MongoTimeoutException}. */ @Nullable - T getImmediateUnfair() { + T getImmediate() { T element = null; - if (stateAndPermits.acquirePermitImmediateUnfair()) { + if (stateAndPermits.acquirePermitImmediate()) { element = available.pollLast(); if (element == null) { stateAndPermits.releasePermit(); @@ -321,48 +318,17 @@ private static final class StateAndPermits { private volatile boolean closed; private final int maxPermits; private volatile int permits; - /** When there are not enough available permits to serve all threads requesting a permit, threads are queued and wait on - * {@link #permitAvailableOrClosedOrPausedCondition}. Because of this waiting, we want threads to acquire the lock fairly, - * to avoid a situation when some threads are sitting in the queue for a long time while others barge in and acquire - * the lock without waiting in the queue. Fair locking reduces high percentiles of {@link #acquirePermit(long, TimeUnit)} latencies - * but reduces its throughput: it makes latencies roughly equally high for everyone, while keeping them lower than the highest - * latencies with unfair locking. The fair approach is in accordance with the - * - * connection pool specification. - *

- * When there are enough available permits to serve all threads requesting a permit, threads still have to acquire the lock, - * and still are queued, but since they are not waiting on {@link #permitAvailableOrClosedOrPausedCondition}, - * threads spend less time in the queue. This results in having smaller high percentiles - * of {@link #acquirePermit(long, TimeUnit)} latencies, and we do not want to sacrifice the throughput - * to further reduce the high percentiles by acquiring the lock fairly.

- *

- * While there is a chance that the expressed reasoning is flawed, it is supported by the results of experiments reported in - * comments in JAVA-4452.

- *

- * {@link ReentrantReadWriteLock#hasWaiters(Condition)} requires holding the lock to be called, therefore we cannot use it - * to discriminate between the two cases described above, and we use {@link #waitersEstimate} instead. - * This approach results in sometimes acquiring a lock unfairly when it should have been acquired fairly, and vice versa. - * But it appears to be a good enough compromise, that results in having enough throughput when there are enough - * available permits and tolerable high percentiles of latencies when there are not enough available permits.

- *

- * It may seem viable to use {@link #permits} > 0 as a way to decide that there are likely no waiters, - * but benchmarking shows that with this approach high percentiles of contended {@link #acquirePermit(long, TimeUnit)} latencies - * (when the number of threads that use the pool is higher than the maximum pool size) become similar to a situation when no - * fair locking is used. That is, this approach does not result in the behavior we want.

- */ - private final AtomicInteger waitersEstimate; @Nullable private Supplier causeSupplier; StateAndPermits(final int maxPermits, final Supplier poolClosedExceptionSupplier) { this.poolClosedExceptionSupplier = poolClosedExceptionSupplier; - lock = new ReentrantLock(true); + lock = new ReentrantLock(); permitAvailableOrClosedOrPausedCondition = lock.newCondition(); paused = false; closed = false; this.maxPermits = maxPermits; permits = maxPermits; - waitersEstimate = new AtomicInteger(); causeSupplier = null; } @@ -370,8 +336,8 @@ int permits() { return permits; } - boolean acquirePermitImmediateUnfair() { - return withUnfairLock(lock, () -> { + boolean acquirePermitImmediate() { + return withLock(lock, () -> { throwIfClosedOrPaused(); if (permits > 0) { //noinspection NonAtomicOperationOnVolatileField @@ -391,17 +357,12 @@ boolean acquirePermitImmediateUnfair() { */ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInterruptedException { long remainingNanos = unit.toNanos(timeout); - if (waitersEstimate.get() == 0) { - lockInterruptiblyUnfair(lock); - } else { - lockInterruptibly(lock); - } + lockInterruptibly(lock); try { while (permits == 0 // the absence of short-circuiting is of importance & !throwIfClosedOrPaused()) { try { - waitersEstimate.incrementAndGet(); if (timeout < 0 || remainingNanos == Long.MAX_VALUE) { permitAvailableOrClosedOrPausedCondition.await(); } else if (remainingNanos >= 0) { @@ -411,8 +372,6 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter } } catch (InterruptedException e) { throw interruptAndCreateMongoInterruptedException(null, e); - } finally { - waitersEstimate.decrementAndGet(); } } assertTrue(permits > 0); @@ -425,7 +384,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter } void releasePermit() { - withUnfairLock(lock, () -> { + withLock(lock, () -> { assertTrue(permits < maxPermits); //noinspection NonAtomicOperationOnVolatileField permits++; @@ -434,7 +393,7 @@ void releasePermit() { } void pause(final Supplier causeSupplier) { - withUnfairLock(lock, () -> { + withLock(lock, () -> { if (!paused) { this.paused = true; permitAvailableOrClosedOrPausedCondition.signalAll(); @@ -445,7 +404,7 @@ void pause(final Supplier causeSupplier) { void ready() { if (paused) { - withUnfairLock(lock, () -> { + withLock(lock, () -> { this.paused = false; this.causeSupplier = null; }); @@ -457,7 +416,7 @@ void ready() { */ boolean close() { if (!closed) { - return withUnfairLock(lock, () -> { + return withLock(lock, () -> { if (!closed) { closed = true; permitAvailableOrClosedOrPausedCondition.signalAll(); diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index 78db18db2dc..248c213a9a4 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -98,7 +98,6 @@ import static com.mongodb.event.ConnectionClosedEvent.Reason.ERROR; import static com.mongodb.internal.Locks.lockInterruptibly; import static com.mongodb.internal.Locks.withLock; -import static com.mongodb.internal.Locks.withUnfairLock; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.connection.ConcurrentPool.INFINITE_SIZE; @@ -351,11 +350,11 @@ private PooledConnection getPooledConnection(final Timeout waitQueueTimeout, fin } @Nullable - private PooledConnection getPooledConnectionImmediateUnfair() { - UsageTrackingInternalConnection internalConnection = pool.getImmediateUnfair(); + private PooledConnection getPooledConnectionImmediate() { + UsageTrackingInternalConnection internalConnection = pool.getImmediate(); while (internalConnection != null && shouldPrune(internalConnection)) { pool.release(internalConnection, true); - internalConnection = pool.getImmediateUnfair(); + internalConnection = pool.getImmediate(); } return internalConnection == null ? null : new PooledConnection(internalConnection); } @@ -896,7 +895,7 @@ private final class OpenConcurrencyLimiter { private final Deque> desiredConnectionSlots; OpenConcurrencyLimiter(final int maxConnecting) { - lock = new ReentrantLock(true); + lock = new ReentrantLock(false); permitAvailableOrHandedOverOrClosedOrPausedCondition = lock.newCondition(); maxPermits = maxConnecting; permits = maxPermits; @@ -1054,11 +1053,8 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole * 2. Thread#2 checks in a connection. Tries to hand it over, but there are no threads desiring to get one. * 3. Thread#1 executes the current code. Expresses the desire to get a connection via the hand-over mechanism, * but thread#2 has already tried handing over and released its connection to the pool. - * As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool. - * - * This attempt should be unfair because the current thread (Thread#1) has already waited for its turn fairly. - * Waiting fairly again puts the current thread behind other threads, which is unfair to the current thread. */ - availableConnection = getPooledConnectionImmediateUnfair(); + * As a result, thread#1 is waiting for a permit to open a connection despite one being available in the pool. */ + availableConnection = getPooledConnectionImmediate(); if (availableConnection != null) { return availableConnection; } @@ -1093,7 +1089,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole } private void releasePermit() { - withUnfairLock(lock, () -> { + withLock(lock, () -> { assertTrue(permits < maxPermits); permits++; permitAvailableOrHandedOverOrClosedOrPausedCondition.signal(); @@ -1128,7 +1124,7 @@ private void giveUpOnTryingToGetAvailableConnection() { * from threads that are waiting for a permit to open a connection. */ void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) { - boolean handedOver = withUnfairLock(lock, () -> { + boolean handedOver = withLock(lock, () -> { for (//iterate from first (head) to last (tail) MutableReference desiredConnectionSlot : desiredConnectionSlots) { if (desiredConnectionSlot.reference == null) { @@ -1145,7 +1141,7 @@ void tryHandOverOrRelease(final UsageTrackingInternalConnection openConnection) } void signalClosedOrPaused() { - withUnfairLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll); + withLock(lock, permitAvailableOrHandedOverOrClosedOrPausedCondition::signalAll); } }