diff --git a/gradle.properties b/gradle.properties index daf5b67f44..ae1f5139d0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -12,7 +12,7 @@ junit_version=4.12 atomicfu_version=0.14.2 knit_version=0.1.3 html_version=0.6.8 -lincheck_version=2.5.3 +lincheck_version=2.6 dokka_version=0.9.16-rdev-2-mpp-hacks byte_buddy_version=1.10.9 reactor_vesion=3.2.5.RELEASE diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index 547a12b4c6..8e899016e7 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -101,6 +101,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) { enableAssertions = true testLogging.showStandardStreams = true systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test + systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2' + systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10' } task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) { diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 1b11bc96cc..769c9f1168 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -380,26 +380,13 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { // atomic unlock operation that checks that waiters queue is empty private class UnlockOp( @JvmField val queue: LockedQueue - ) : OpDescriptor() { - override val atomicOp: AtomicOp<*>? get() = null - - override fun perform(affected: Any?): Any? { - /* - Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to - make a decision will reach it consistently. It does not matter what is a proposed - decision when this UnlockOp is no longer active, because in this case the following CAS - will fail anyway. - */ - val success = queue.isEmpty - val update: Any = if (success) EMPTY_UNLOCKED else queue - (affected as MutexImpl)._state.compareAndSet(this@UnlockOp, update) - /* - `perform` invocation from the original `unlock` invocation may be coming too late, when - some other thread had already helped to complete it (either successfully or not). - That operation was unsuccessful if `state` was restored to this `queue` reference and - that is what is being checked below. - */ - return if (affected._state.value === queue) UNLOCK_FAIL else null + ) : AtomicOp() { + override fun prepare(affected: MutexImpl): Any? = + if (queue.isEmpty) null else UNLOCK_FAIL + + override fun complete(affected: MutexImpl, failure: Any?) { + val update: Any = if (failure == null) EMPTY_UNLOCKED else queue + affected._state.compareAndSet(this, update) } } } diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 7cdc736197..125bbaaeb5 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -84,12 +84,41 @@ public suspend inline fun Semaphore.withPermit(action: () -> T): T { } private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore { + /* + The queue of waiting acquirers is essentially an infinite array based on the list of segments + (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue + and dequeue operation, we increment the corresponding counter at the beginning of the operation + and use the value before the increment as a slot number. This way, each enqueue-dequeue pair + works with an individual cell. We use the corresponding segment pointers to find the required ones. + + Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation + can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons + so that the state `PERMIT` represents different logical states. + + +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then + | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets + +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes. + | | + | | `acquire` request is cancelled and the continuation is + | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks + | to the slot before V + | `acquire` and puts +-----------+ `release` has +--------+ + | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED) + | slot, waiting for +-----------+ failed +--------+ + | `acquire` after + | that. + | + | `acquire` gets +-------+ + | +-----------------> | TAKEN | (ELIMINATION HAPPENED) + V | the permit +-------+ + +--------+ | + | PERMIT | -< + +--------+ | + | `release` has waited a bounded time, +--------+ + +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED) + but `acquire` has not come +--------+ + */ - // The queue of waiting acquirers is essentially an infinite array based on the list of segments - // (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue - // and dequeue operation, we increment the corresponding counter at the beginning of the operation - // and use the value before the increment as a slot number. This way, each enqueue-dequeue pair - // works with an individual cell.We use the corresponding segment pointer to find the required ones. private val head: AtomicRef private val deqIdx = atomic(0L) private val tail: AtomicRef @@ -123,74 +152,100 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se override suspend fun acquire() { val p = _availablePermits.getAndDecrement() if (p > 0) return // permit acquired - addToQueueAndSuspend() + // While it looks better when the following function is inlined, + // it is important to make `suspend` function invocations in a way + // so that the tail-call optimization can be applied. + acquireSlowPath() } - override fun release() { - val p = incPermits() - if (p >= 0) return // no waiters - resumeNextFromQueue() + private suspend fun acquireSlowPath() = suspendAtomicCancellableCoroutineReusable sc@ { cont -> + while (true) { + if (addAcquireToQueue(cont)) return@sc + val p = _availablePermits.getAndDecrement() + if (p > 0) { // permit acquired + cont.resume(Unit) + return@sc + } + } } - fun incPermits() = _availablePermits.getAndUpdate { cur -> - check(cur < permits) { "The number of released permits cannot be greater than $permits" } - cur + 1 + override fun release() { + while (true) { + val p = _availablePermits.getAndUpdate { cur -> + check(cur < permits) { "The number of released permits cannot be greater than $permits" } + cur + 1 + } + if (p >= 0) return + if (tryResumeNextFromQueue()) return + } } - private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable sc@{ cont -> + /** + * Returns `false` if the received permit cannot be used and the calling operation should restart. + */ + private fun addAcquireToQueue(cont: CancellableContinuation): Boolean { val curTail = this.tail.value val enqIdx = enqIdx.getAndIncrement() val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail, - createNewSegment = ::createSegment).run { segment } // cannot be closed + createNewSegment = ::createSegment).segment // cannot be closed val i = (enqIdx % SEGMENT_SIZE).toInt() - if (segment.get(i) === RESUMED || !segment.cas(i, null, cont)) { - // already resumed + // the regular (fast) path -- if the cell is empty, try to install continuation + if (segment.cas(i, null, cont)) { // installed continuation successfully + cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler) + return true + } + // On CAS failure -- the cell must be either PERMIT or BROKEN + // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it + if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair cont.resume(Unit) - return@sc + return true } - cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler) + assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it + return false // broken cell, need to retry on a different cell } @Suppress("UNCHECKED_CAST") - internal fun resumeNextFromQueue() { - try_again@ while (true) { - val curHead = this.head.value - val deqIdx = deqIdx.getAndIncrement() - val id = deqIdx / SEGMENT_SIZE - val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, - createNewSegment = ::createSegment).run { segment } // cannot be closed - segment.cleanPrev() - if (segment.id > id) { - this.deqIdx.updateIfLower(segment.id * SEGMENT_SIZE) - continue@try_again + private fun tryResumeNextFromQueue(): Boolean { + val curHead = this.head.value + val deqIdx = deqIdx.getAndIncrement() + val id = deqIdx / SEGMENT_SIZE + val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, + createNewSegment = ::createSegment).segment // cannot be closed + segment.cleanPrev() + if (segment.id > id) return false + val i = (deqIdx % SEGMENT_SIZE).toInt() + val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state + when { + cellState === null -> { + // Acquire has not touched this cell yet, wait until it comes for a bounded time + // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue + repeat(MAX_SPIN_CYCLES) { + if (segment.get(i) === TAKEN) return true + } + // Try to break the slot in order not to wait + return !segment.cas(i, PERMIT, BROKEN) } - val i = (deqIdx % SEGMENT_SIZE).toInt() - val cont = segment.getAndSet(i, RESUMED) - if (cont === null) return // just resumed - if (cont === CANCELLED) continue@try_again - (cont as CancellableContinuation).resume(Unit) - return + cellState === CANCELLED -> return false // the acquire was already cancelled + else -> return (cellState as CancellableContinuation).tryResume() } } } -private inline fun AtomicLong.updateIfLower(value: Long): Unit = loop { cur -> - if (cur >= value || compareAndSet(cur, value)) return +private fun CancellableContinuation.tryResume(): Boolean { + val token = tryResume(Unit) ?: return false + completeResume(token) + return true } private class CancelSemaphoreAcquisitionHandler( - private val semaphore: SemaphoreImpl, private val segment: SemaphoreSegment, private val index: Int ) : CancelHandler() { override fun invoke(cause: Throwable?) { - val p = semaphore.incPermits() - if (p >= 0) return - if (segment.cancel(index)) return - semaphore.resumeNextFromQueue() + segment.cancel(index) } - override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]" + override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]" } private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0) @@ -202,6 +257,11 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) @Suppress("NOTHING_TO_INLINE") inline fun get(index: Int): Any? = acquirers[index].value + @Suppress("NOTHING_TO_INLINE") + inline fun set(index: Int, value: Any?) { + acquirers[index].value = value + } + @Suppress("NOTHING_TO_INLINE") inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) @@ -210,19 +270,23 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) // Cleans the acquirer slot located by the specified index // and removes this segment physically if all slots are cleaned. - fun cancel(index: Int): Boolean { - // Try to cancel the slot - val cancelled = getAndSet(index, CANCELLED) !== RESUMED + fun cancel(index: Int) { + // Clean the slot + set(index, CANCELLED) // Remove this segment if needed onSlotCleaned() - return cancelled } override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" } - @SharedImmutable -private val RESUMED = Symbol("RESUMED") +private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100) +@SharedImmutable +private val PERMIT = Symbol("PERMIT") +@SharedImmutable +private val TAKEN = Symbol("TAKEN") +@SharedImmutable +private val BROKEN = Symbol("BROKEN") @SharedImmutable private val CANCELLED = Symbol("CANCELLED") @SharedImmutable diff --git a/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt b/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt index 3d1305c682..5e408b74ac 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt @@ -51,13 +51,15 @@ internal class SegmentBasedQueue { val segmentOrClosed = this.head.findSegmentAndMoveForward(id = deqIdx, startFrom = curHead, createNewSegment = ::createSegment) if (segmentOrClosed.isClosed) return null val s = segmentOrClosed.segment - s.cleanPrev() if (s.id > deqIdx) continue var el = s.element.value if (el === null) { if (s.element.compareAndSet(null, BROKEN)) continue else el = s.element.value } + // The link to the previous segment should be cleaned after retrieving the element; + // otherwise, `close()` cannot clean the slot. + s.cleanPrev() if (el === BROKEN) continue @Suppress("UNCHECKED_CAST") return el as T diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt new file mode 100644 index 0000000000..9542b5d8de --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/linearizability/MutexLCStressTest.kt @@ -0,0 +1,31 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("unused") +package kotlinx.coroutines.linearizability + +import kotlinx.coroutines.* +import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* + +class MutexLCStressTest : VerifierState() { + private val mutex = Mutex() + + @Operation + fun tryLock() = mutex.tryLock() + + @Operation + suspend fun lock() = mutex.lock() + + @Operation(handleExceptionsAsResult = [IllegalStateException::class]) + fun unlock() = mutex.unlock() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .check(this::class) + + override fun extractState() = mutex.isLocked +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt new file mode 100644 index 0000000000..52902f4987 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +@file:Suppress("unused") +package kotlinx.coroutines.linearizability + +import kotlinx.coroutines.* +import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* + +abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() { + private val semaphore = Semaphore(permits) + + @Operation + fun tryAcquire() = semaphore.tryAcquire() + + @Operation + suspend fun acquire() = semaphore.acquire() + + @Operation(handleExceptionsAsResult = [IllegalStateException::class]) + fun release() = semaphore.release() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .check(this::class) + + override fun extractState() = semaphore.availablePermits +} + +class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1) +class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2) \ No newline at end of file