From 9658d3ff929a97a0d433774ab0844c5841023d3f Mon Sep 17 00:00:00 2001 From: Nikita Koval Date: Thu, 11 Nov 2021 18:43:53 +0300 Subject: [PATCH] Base Mutex implementation on Semaphore and improve the latter --- .../common/src/sync/Mutex.kt | 366 ++++-------------- .../common/src/sync/Semaphore.kt | 82 +++- .../jvm/test/lincheck/MutexLincheckTest.kt | 37 +- .../jvm/test/selects/SelectMutexStressTest.kt | 1 - 4 files changed, 176 insertions(+), 310 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index c02422825e..11fdf2ce96 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -7,10 +7,8 @@ package kotlinx.coroutines.sync import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* -import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlin.contracts.* -import kotlin.jvm.* import kotlin.native.concurrent.* /** @@ -27,13 +25,17 @@ import kotlin.native.concurrent.* */ public interface Mutex { /** - * Returns `true` when this mutex is locked. + * Returns `true` if this mutex is locked. */ public val isLocked: Boolean /** * Tries to lock this mutex, returning `false` if this mutex is already locked. * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of your critical section, and [unlock] is never invoked before a successful + * lock acquisition. + * * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex * is already locked with the same token (same identity), this function throws [IllegalStateException]. */ @@ -52,24 +54,35 @@ public interface Mutex { * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * Use [tryLock] to try acquiring a lock without waiting. + * This function can be used in [select] invocation with [onLock] clause. + * Use [tryLock] to try acquiring the lock without waiting. * * This function is fair; suspended callers are resumed in first-in-first-out order. * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of the critical section, and [unlock] is never invoked before a successful + * lock acquisition. + * * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex * is already locked with the same token (same identity), this function throws [IllegalStateException]. */ public suspend fun lock(owner: Any? = null) /** - * Deprecated for removal without built-in replacement. + * Clause for [select] expression of [lock] suspending function that selects when the mutex is locked. + * Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected + * the reference to this mutex is passed into the corresponding block. + * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of the critical section, and [unlock] is never invoked before a successful + * lock acquisition. */ @Deprecated(level = DeprecationLevel.WARNING, message = "Mutex.onLock deprecated without replacement. " + "For additional details please refer to #2794") public val onLock: SelectClause2 /** - * Checks mutex locked by owner + * Checks whether this mutex is locked by the specified owner. * * @return `true` on mutex lock by owner, `false` if not locker or it is locked by different owner */ @@ -79,6 +92,10 @@ public interface Mutex { * Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or * was locked with a different owner token (by identity). * + * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always + * released at the end of the critical section, and [unlock] is never invoked before a successful + * lock acquisition. + * * @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex * was locked with the different token (by identity), this function throws [IllegalStateException]. */ @@ -93,7 +110,7 @@ public interface Mutex { */ @Suppress("FunctionName") public fun Mutex(locked: Boolean = false): Mutex = - MutexImpl(locked) + MutexImpl(locked) // TODO: locked with owner? /** * Executes the given [action] under this mutex's lock. @@ -105,7 +122,7 @@ public fun Mutex(locked: Boolean = false): Mutex = */ @OptIn(ExperimentalContracts::class) public suspend inline fun Mutex.withLock(owner: Any? = null, action: () -> T): T { - contract { + contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) } @@ -117,307 +134,76 @@ public suspend inline fun Mutex.withLock(owner: Any? = null, action: () -> T } } -@SharedImmutable -private val LOCK_FAIL = Symbol("LOCK_FAIL") -@SharedImmutable -private val UNLOCK_FAIL = Symbol("UNLOCK_FAIL") -@SharedImmutable -private val LOCKED = Symbol("LOCKED") -@SharedImmutable -private val UNLOCKED = Symbol("UNLOCKED") - -@SharedImmutable -private val EMPTY_LOCKED = Empty(LOCKED) -@SharedImmutable -private val EMPTY_UNLOCKED = Empty(UNLOCKED) -private class Empty( - @JvmField val locked: Any -) { - override fun toString(): String = "Empty[$locked]" -} +internal class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1 else 0), Mutex { + private val owner = atomic(if (locked) null else NO_OWNER) -internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { - // State is: Empty | LockedQueue | OpDescriptor - // shared objects while we have no waiters - private val _state = atomic(if (locked) EMPTY_LOCKED else EMPTY_UNLOCKED) + override val isLocked: Boolean get() = + availablePermits == 0 - public override val isLocked: Boolean get() { - _state.loop { state -> - when (state) { - is Empty -> return state.locked !== UNLOCKED - is LockedQueue -> return true - is OpDescriptor -> state.perform(this) // help - else -> error("Illegal state $state") - } + override fun holdsLock(owner: Any): Boolean { + while (true) { + // Is this mutex locked? + if (!isLocked) return false + val curOwner = this.owner.value + // Wait in a spin-loop until the owner is set + if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE + // Check the owner + return curOwner === owner } } - // for tests ONLY - internal val isLockedEmptyQueueState: Boolean get() { - val state = _state.value - return state is LockedQueue && state.isEmpty - } - - public override fun tryLock(owner: Any?): Boolean { - _state.loop { state -> - when (state) { - is Empty -> { - if (state.locked !== UNLOCKED) return false - val update = if (owner == null) EMPTY_LOCKED else Empty( - owner - ) - if (_state.compareAndSet(state, update)) return true - } - is LockedQueue -> { - check(state.owner !== owner) { "Already locked by $owner" } - return false - } - is OpDescriptor -> state.perform(this) // help - else -> error("Illegal state $state") - } - } - } - - public override suspend fun lock(owner: Any?) { - // fast-path -- try lock + override suspend fun lock(owner: Any?) { if (tryLock(owner)) return - // slow-path -- suspend - return lockSuspend(owner) - } - - private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable sc@ { cont -> - var waiter = LockCont(owner, cont) - _state.loop { state -> - when (state) { - is Empty -> { - if (state.locked !== UNLOCKED) { // try upgrade to queue & retry - _state.compareAndSet(state, LockedQueue(state.locked)) - } else { - // try lock - val update = if (owner == null) EMPTY_LOCKED else Empty(owner) - if (_state.compareAndSet(state, update)) { // locked - // TODO implement functional type in LockCont as soon as we get rid of legacy JS - cont.resume(Unit) { unlock(owner) } - return@sc - } - } - } - is LockedQueue -> { - val curOwner = state.owner - check(curOwner !== owner) { "Already locked by $owner" } - - state.addLast(waiter) - /* - * If the state has been changed while we were adding the waiter, - * it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten. - * To rendezvous that, we try to "invalidate" our node and go for retry. - * - * Node has to be re-instantiated as we do not support node re-adding, even to - * another list - */ - if (_state.value === state || !waiter.take()) { - // added to waiter list - cont.removeOnCancellation(waiter) - return@sc - } - - waiter = LockCont(owner, cont) - return@loop - } - is OpDescriptor -> state.perform(this) // help - else -> error("Illegal state $state") - } - } - } - - override val onLock: SelectClause2 - get() = this - - // registerSelectLock - @Suppress("PARAMETER_NAME_CHANGED_ON_OVERRIDE") - override fun registerSelectClause2(select: SelectInstance, owner: Any?, block: suspend (Mutex) -> R) { - while (true) { // lock-free loop on state - if (select.isSelected) return - when (val state = _state.value) { - is Empty -> { - if (state.locked !== UNLOCKED) { // try upgrade to queue & retry - _state.compareAndSet(state, LockedQueue(state.locked)) - } else { - // try lock - val failure = select.performAtomicTrySelect(TryLockDesc(this, owner)) - when { - failure == null -> { // success - block.startCoroutineUnintercepted(receiver = this, completion = select.completion) - return - } - failure === ALREADY_SELECTED -> return // already selected -- bail out - failure === LOCK_FAIL -> {} // retry - failure === RETRY_ATOMIC -> {} // retry - else -> error("performAtomicTrySelect(TryLockDesc) returned $failure") - } - } - } - is LockedQueue -> { - check(state.owner !== owner) { "Already locked by $owner" } - val node = LockSelect(owner, select, block) - /* - * If the state has been changed while we were adding the waiter, - * it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten. - * To rendezvous that, we try to "invalidate" our node and go for retry. - * - * Node has to be re-instantiated as we do not support node re-adding, even to - * another list - */ - state.addLast(node) - if (_state.value === state || !node.take()) { - // added to waiter list - select.disposeOnSelect(node) - return - } - } - is OpDescriptor -> state.perform(this) // help - else -> error("Illegal state $state") - } - } + lockSuspend(owner) } - private class TryLockDesc( - @JvmField val mutex: MutexImpl, - @JvmField val owner: Any? - ) : AtomicDesc() { - // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation - private inner class PrepareOp(override val atomicOp: AtomicOp<*>) : OpDescriptor() { - override fun perform(affected: Any?): Any? { - val update: Any = if (atomicOp.isDecided) EMPTY_UNLOCKED else atomicOp // restore if was already decided - (affected as MutexImpl)._state.compareAndSet(this, update) - return null // ok - } - } - - override fun prepare(op: AtomicOp<*>): Any? { - val prepare = PrepareOp(op) - if (!mutex._state.compareAndSet(EMPTY_UNLOCKED, prepare)) return LOCK_FAIL - return prepare.perform(mutex) - } - - override fun complete(op: AtomicOp<*>, failure: Any?) { - val update = if (failure != null) EMPTY_UNLOCKED else { - if (owner == null) EMPTY_LOCKED else Empty(owner) - } - mutex._state.compareAndSet(op, update) - } + private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable { cont -> + val contWithOwner = CancellableContinuationWithOwner(cont, owner) + acquire(contWithOwner) } - public override fun holdsLock(owner: Any) = - _state.value.let { state -> - when (state) { - is Empty -> state.locked === owner - is LockedQueue -> state.owner === owner - else -> false - } - } + override fun tryLock(owner: Any?): Boolean = + if (tryAcquire()) { + this.owner.value = owner + true + } else false override fun unlock(owner: Any?) { - _state.loop { state -> - when (state) { - is Empty -> { - if (owner == null) - check(state.locked !== UNLOCKED) { "Mutex is not locked" } - else - check(state.locked === owner) { "Mutex is locked by ${state.locked} but expected $owner" } - if (_state.compareAndSet(state, EMPTY_UNLOCKED)) return - } - is OpDescriptor -> state.perform(this) - is LockedQueue -> { - if (owner != null) - check(state.owner === owner) { "Mutex is locked by ${state.owner} but expected $owner" } - val waiter = state.removeFirstOrNull() - if (waiter == null) { - val op = UnlockOp(state) - if (_state.compareAndSet(state, op) && op.perform(this) == null) return - } else { - if ((waiter as LockWaiter).tryResumeLockWaiter()) { - state.owner = waiter.owner ?: LOCKED - waiter.completeResumeLockWaiter() - return - } - } - } - else -> error("Illegal state $state") - } - } - } - - override fun toString(): String { - _state.loop { state -> - when (state) { - is Empty -> return "Mutex[${state.locked}]" - is OpDescriptor -> state.perform(this) - is LockedQueue -> return "Mutex[${state.owner}]" - else -> error("Illegal state $state") - } + while (true) { + // Is this mutex locked? + check(isLocked) { "This mutex is not locked" } + // Read the owner, waiting until it is set in a spin-loop if required. + val curOwner = this.owner.value + if (curOwner === NO_OWNER) continue // <-- ATTENTION, BLOCKING PART HERE + // Check the owner. + check(curOwner === owner) { "This mutex is locked by $curOwner, but $owner is expected" } + // Try to clean the owner first. We need to use CAS here to synchronize with concurrent `unlock(..)`-s. + if (!this.owner.compareAndSet(owner, NO_OWNER)) continue + // Release the semaphore permit at the end. + release() + return } } - private class LockedQueue( - @JvmField var owner: Any - ) : LockFreeLinkedListHead() { - override fun toString(): String = "LockedQueue[$owner]" - } + override val onLock: SelectClause2 get() = TODO("Will be implemented later") - private abstract inner class LockWaiter( - @JvmField val owner: Any? - ) : LockFreeLinkedListNode(), DisposableHandle { - private val isTaken = atomic(false) - fun take(): Boolean = isTaken.compareAndSet(false, true) - final override fun dispose() { remove() } - abstract fun tryResumeLockWaiter(): Boolean - abstract fun completeResumeLockWaiter() - } - - private inner class LockCont( - owner: Any?, - private val cont: CancellableContinuation - ) : LockWaiter(owner) { - - override fun tryResumeLockWaiter(): Boolean { - if (!take()) return false - return cont.tryResume(Unit, idempotent = null) { - // if this continuation gets cancelled during dispatch to the caller, then release the lock - unlock(owner) - } != null + private inner class CancellableContinuationWithOwner( + val cont: CancellableContinuation, + val owner: Any? + ) : CancellableContinuation by cont { + override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? { + val token = cont.tryResume(value, idempotent, onCancellation) + if (token !== null) this@MutexImpl.owner.value = owner + return token } - override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN) - override fun toString(): String = "LockCont[$owner, ${cont}] for ${this@MutexImpl}" - } - - private inner class LockSelect( - owner: Any?, - @JvmField val select: SelectInstance, - @JvmField val block: suspend (Mutex) -> R - ) : LockWaiter(owner) { - override fun tryResumeLockWaiter(): Boolean = take() && select.trySelect() - override fun completeResumeLockWaiter() { - block.startCoroutineCancellable(receiver = this@MutexImpl, completion = select.completion) { - // if this continuation gets cancelled during dispatch to the caller, then release the lock - unlock(owner) - } - } - override fun toString(): String = "LockSelect[$owner, $select] for ${this@MutexImpl}" - } - - // atomic unlock operation that checks that waiters queue is empty - private class UnlockOp( - @JvmField val queue: LockedQueue - ) : AtomicOp() { - override fun prepare(affected: MutexImpl): Any? = - if (queue.isEmpty) null else UNLOCK_FAIL - - override fun complete(affected: MutexImpl, failure: Any?) { - val update: Any = if (failure == null) EMPTY_UNLOCKED else queue - affected._state.compareAndSet(this, update) + override fun resume(value: Unit, onCancellation: ((cause: Throwable) -> Unit)?) { + this@MutexImpl.owner.value = owner + cont.resume(value, onCancellation) } } } + +@SharedImmutable +private val NO_OWNER = Symbol("NO_OWNER") diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index e8b28bc15c..bbce582fb9 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -8,9 +8,9 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlin.contracts.* -import kotlin.coroutines.* +import kotlin.js.* import kotlin.math.* -import kotlin.native.concurrent.SharedImmutable +import kotlin.native.concurrent.* /** * A counting semaphore for coroutines that logically maintains a number of available permits. @@ -90,7 +90,7 @@ public suspend inline fun Semaphore.withPermit(action: () -> T): T { } } -private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore { +internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore { /* The queue of waiting acquirers is essentially an infinite array based on the list of segments (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue @@ -140,11 +140,11 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se } /** - * This counter indicates a number of available permits if it is non-negative, - * or the size with minus sign otherwise. Note, that 32-bit counter is enough here - * since the maximal number of available permits is [permits] which is [Int], - * and the maximum number of waiting acquirers cannot be greater than 2^31 in any - * real application. + * This counter indicates the number of available permits if it is positive, + * or the negated number of waiters on this semaphore otherwise. + * Note, that 32-bit counter is enough here since the maximal number of available + * permits is [permits] which is [Int], and the maximum number of waiting acquirers + * cannot be greater than 2^31 in any real application. */ private val _availablePermits = atomic(permits - acquiredPermits) override val availablePermits: Int get() = max(_availablePermits.value, 0) @@ -152,43 +152,95 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se private val onCancellationRelease = { _: Throwable -> release() } override fun tryAcquire(): Boolean { - _availablePermits.loop { p -> + while (true) { + // Get the current number of available permits. + val p = _availablePermits.value + // Is the number of available permits greater + // than the maximal one because of an incorrect + // `release()` call without a preceding `acquire()`? + // Change it to `permits` and start from the beginning. + if (p > permits) { + coerceAvailablePermitsAtMaximum() + continue + } + // Try to decrement the number of available + // permits if it is greater than zero. if (p <= 0) return false if (_availablePermits.compareAndSet(p, p - 1)) return true } } override suspend fun acquire() { + // Decrement the number of available permits. val p = _availablePermits.getAndDecrement() + // Is the permit acquired? if (p > 0) return // permit acquired + // Try to suspend otherwise. // While it looks better when the following function is inlined, // it is important to make `suspend` function invocations in a way - // so that the tail-call optimization can be applied. + // so that the tail-call optimization can be applied here. acquireSlowPath() } private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable sc@ { cont -> + // Try to suspend. + if (addAcquireToQueue(cont)) return@sc + // The suspension has been failed + // due to the synchronous resumption mode. + // Restart the whole `acquire`. + acquire(cont) + } + + @JsName("acquireCont") + protected fun acquire(cont: CancellableContinuation) { while (true) { - if (addAcquireToQueue(cont)) return@sc + // Decrement the number of available permits at first. val p = _availablePermits.getAndDecrement() + // Is the permit acquired? if (p > 0) { // permit acquired cont.resume(Unit, onCancellationRelease) - return@sc + return } + // Permit has not been acquired, try to suspend. + if (addAcquireToQueue(cont)) return } } override fun release() { while (true) { - val p = _availablePermits.getAndUpdate { cur -> - check(cur < permits) { "The number of released permits cannot be greater than $permits" } - cur + 1 + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is this `release` call correct and does not + // exceed the maximal number of permits? + if (p >= permits) { + // Revert the number of available permits + // back to the correct one and fail with error. + coerceAvailablePermitsAtMaximum() + error("The number of released permits cannot be greater than $permits") } + // Is there a waiter that should be resumed? if (p >= 0) return + // Try to resume the first waiter, and + // restart the operation if either this + // first waiter is cancelled or + // due to `SYNC` resumption mode. if (tryResumeNextFromQueue()) return } } + /** + * Changes the number of available permits to + * [permits] if it became greater due to an + * incorrect [release] call. + */ + private fun coerceAvailablePermitsAtMaximum() { + while (true) { + val cur = _availablePermits.value + if (cur <= permits) break + if (_availablePermits.compareAndSet(cur, permits)) break + } + } + /** * Returns `false` if the received permit cannot be used and the calling operation should restart. */ diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt index a278985fdd..e4b1281205 100644 --- a/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/lincheck/MutexLincheckTest.kt @@ -7,10 +7,14 @@ package kotlinx.coroutines.lincheck import kotlinx.coroutines.* import kotlinx.coroutines.sync.* import org.jetbrains.kotlinx.lincheck.* +import org.jetbrains.kotlinx.lincheck.annotations.* import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.paramgen.* import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* +import org.jetbrains.kotlinx.lincheck.strategy.stress.* +import org.jetbrains.kotlinx.lincheck.verifier.* -class MutexLincheckTest : AbstractLincheckTest() { +class MutexWithoutOwnerLincheckTest : AbstractLincheckTest() { private val mutex = Mutex() @Operation @@ -25,8 +29,33 @@ class MutexLincheckTest : AbstractLincheckTest() { override fun > O.customize(isStressTest: Boolean): O = actorsBefore(0) - override fun ModelCheckingOptions.customize(isStressTest: Boolean) = - checkObstructionFreedom() - override fun extractState() = mutex.isLocked } + +@Param(name = "owner", gen = IntGen::class, conf = "0:2") +class MutexLincheckTest : AbstractLincheckTest() { + private val mutex = Mutex() + + @Operation + fun tryLock(@Param(name = "owner") owner: Int) = mutex.tryLock(owner.asOwnerOrNull) + + @Operation(promptCancellation = true) + suspend fun lock(@Param(name = "owner") owner: Int) = mutex.lock(owner.asOwnerOrNull) + + @Operation(handleExceptionsAsResult = [IllegalStateException::class]) + fun unlock(@Param(name = "owner") owner: Int) = mutex.unlock(owner.asOwnerOrNull) + + @Operation + fun isLocked() = mutex.isLocked + + @Operation + fun holdsLock(@Param(name = "owner") owner: Int) = mutex.holdsLock(owner) + + override fun > O.customize(isStressTest: Boolean): O = + actorsBefore(0) + + // state[i] == true <=> mutex.holdsLock(i) with the only exception for 0 that specifies `null`. + override fun extractState() = (1..2).map { mutex.holdsLock(it) } + mutex.isLocked + + private val Int.asOwnerOrNull get() = if (this == 0) null else this +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectMutexStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectMutexStressTest.kt index 5489ea5d73..97ebbea94e 100644 --- a/kotlinx-coroutines-core/jvm/test/selects/SelectMutexStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/selects/SelectMutexStressTest.kt @@ -28,7 +28,6 @@ class SelectMutexStressTest : TestBase() { yield() // so it can cleanup after itself } assertTrue(mutex.isLocked) - assertTrue(mutex.isLockedEmptyQueueState) finish(n + 2) } } \ No newline at end of file