Skip to content

Add linearizability tests for Mutex and Semaphore and fix them #1898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ junit_version=4.12
atomicfu_version=0.14.2
knit_version=0.1.3
html_version=0.6.8
lincheck_version=2.5.3
lincheck_version=2.6
dokka_version=0.9.16-rdev-2-mpp-hacks
byte_buddy_version=1.10.9
reactor_vesion=3.2.5.RELEASE
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
enableAssertions = true
testLogging.showStandardStreams = true
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
}

task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
Expand Down
27 changes: 7 additions & 20 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -380,26 +380,13 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// atomic unlock operation that checks that waiters queue is empty
private class UnlockOp(
@JvmField val queue: LockedQueue
) : OpDescriptor() {
override val atomicOp: AtomicOp<*>? get() = null

override fun perform(affected: Any?): Any? {
/*
Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to
make a decision will reach it consistently. It does not matter what is a proposed
decision when this UnlockOp is no longer active, because in this case the following CAS
will fail anyway.
*/
val success = queue.isEmpty
val update: Any = if (success) EMPTY_UNLOCKED else queue
(affected as MutexImpl)._state.compareAndSet(this@UnlockOp, update)
/*
`perform` invocation from the original `unlock` invocation may be coming too late, when
some other thread had already helped to complete it (either successfully or not).
That operation was unsuccessful if `state` was restored to this `queue` reference and
that is what is being checked below.
*/
return if (affected._state.value === queue) UNLOCK_FAIL else null
) : AtomicOp<MutexImpl>() {
override fun prepare(affected: MutexImpl): Any? =
if (queue.isEmpty) null else UNLOCK_FAIL

override fun complete(affected: MutexImpl, failure: Any?) {
val update: Any = if (failure == null) EMPTY_UNLOCKED else queue
affected._state.compareAndSet(this, update)
}
}
}
170 changes: 119 additions & 51 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,41 @@ public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
}

private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
/*
The queue of waiting acquirers is essentially an infinite array based on the list of segments
(see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
and dequeue operation, we increment the corresponding counter at the beginning of the operation
and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
works with an individual cell. We use the corresponding segment pointers to find the required ones.

Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
so that the state `PERMIT` represents different logical states.

+------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
| NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
+------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
| |
| | `acquire` request is cancelled and the continuation is
| `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
| to the slot before V
| `acquire` and puts +-----------+ `release` has +--------+
| a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
| slot, waiting for +-----------+ failed +--------+
| `acquire` after
| that.
|
| `acquire` gets +-------+
| +-----------------> | TAKEN | (ELIMINATION HAPPENED)
V | the permit +-------+
+--------+ |
| PERMIT | -<
+--------+ |
| `release` has waited a bounded time, +--------+
+---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
but `acquire` has not come +--------+
*/

// The queue of waiting acquirers is essentially an infinite array based on the list of segments
// (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
// and dequeue operation, we increment the corresponding counter at the beginning of the operation
// and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
// works with an individual cell.We use the corresponding segment pointer to find the required ones.
private val head: AtomicRef<SemaphoreSegment>
private val deqIdx = atomic(0L)
private val tail: AtomicRef<SemaphoreSegment>
Expand Down Expand Up @@ -123,74 +152,104 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
override suspend fun acquire() {
val p = _availablePermits.getAndDecrement()
if (p > 0) return // permit acquired
addToQueueAndSuspend()
// While it looks better when the following function is inlined,
// it is important to make `suspend` function invocations in a way
// so that the tail-call optimization can be applied.
acquireSlowPath()
}

override fun release() {
val p = incPermits()
if (p >= 0) return // no waiters
resumeNextFromQueue()
private suspend fun acquireSlowPath() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
while (true) {
if (addAcquireToQueue(cont)) return@sc
val p = _availablePermits.getAndDecrement()
if (p > 0) { // permit acquired
cont.resume(Unit)
return@sc
}
}
}

fun incPermits() = _availablePermits.getAndUpdate { cur ->
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
override fun release() {
while (true) {
val p = _availablePermits.getAndUpdate { cur ->
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
}
if (p >= 0) return
if (tryResumeNextFromQueue()) return
}
}

private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@{ cont ->
/**
* Returns `false` if the received permit cannot be used and the calling operation should restart.
*/
private fun addAcquireToQueue(cont: CancellableContinuation<Unit>): Boolean {
val curTail = this.tail.value
val enqIdx = enqIdx.getAndIncrement()
val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
createNewSegment = ::createSegment).run { segment } // cannot be closed
createNewSegment = ::createSegment).segment // cannot be closed
val i = (enqIdx % SEGMENT_SIZE).toInt()
if (segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
// already resumed
cont.resume(Unit)
return@sc
while (true) { // cas loop on cell state
val cellState = segment.get(i)
when {
cellState === null -> // the cell if empty, try to install continuation
if (segment.cas(i, null, cont)) { // fast path -- installed continuation successfully
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
return true
}
cellState === PERMIT -> // the cell already has permit from tryResumeNextFromQueue, try to grab it
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
cont.resume(Unit)
return true
}
cellState === BROKEN -> return false // broken cell, need to retry on a different cell
else -> error("Invalid state $cellState") // this cannot happen
}
}
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
}

@Suppress("UNCHECKED_CAST")
internal fun resumeNextFromQueue() {
try_again@ while (true) {
val curHead = this.head.value
val deqIdx = deqIdx.getAndIncrement()
val id = deqIdx / SEGMENT_SIZE
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
createNewSegment = ::createSegment).run { segment } // cannot be closed
segment.cleanPrev()
if (segment.id > id) {
this.deqIdx.updateIfLower(segment.id * SEGMENT_SIZE)
continue@try_again
private fun tryResumeNextFromQueue(): Boolean {
val curHead = this.head.value
val deqIdx = deqIdx.getAndIncrement()
val id = deqIdx / SEGMENT_SIZE
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
createNewSegment = ::createSegment).segment // cannot be closed
segment.cleanPrev()
if (segment.id > id) return false
val i = (deqIdx % SEGMENT_SIZE).toInt()
val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
when {
cellState === null -> {
// Acquire has not touched this cell yet, wait until it comes for a bounded time
// The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
repeat(MAX_SPIN_CYCLES) {
if (segment.get(i) === TAKEN) return true
}
// Try to break the slot in order not to wait
return !segment.cas(i, PERMIT, BROKEN)
}
val i = (deqIdx % SEGMENT_SIZE).toInt()
val cont = segment.getAndSet(i, RESUMED)
if (cont === null) return // just resumed
if (cont === CANCELLED) continue@try_again
(cont as CancellableContinuation<Unit>).resume(Unit)
return
cellState === CANCELLED -> return false // the acquire was already cancelled
else -> return (cellState as CancellableContinuation<Unit>).tryResume()
}
}
}

private inline fun AtomicLong.updateIfLower(value: Long): Unit = loop { cur ->
if (cur >= value || compareAndSet(cur, value)) return
private fun CancellableContinuation<Unit>.tryResume(): Boolean {
val token = tryResume(Unit) ?: return false
completeResume(token)
return true
}

private class CancelSemaphoreAcquisitionHandler(
private val semaphore: SemaphoreImpl,
private val segment: SemaphoreSegment,
private val index: Int
) : CancelHandler() {
override fun invoke(cause: Throwable?) {
val p = semaphore.incPermits()
if (p >= 0) return
if (segment.cancel(index)) return
semaphore.resumeNextFromQueue()
segment.cancel(index)
}

override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]"
}

private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
Expand All @@ -202,6 +261,11 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)
@Suppress("NOTHING_TO_INLINE")
inline fun get(index: Int): Any? = acquirers[index].value

@Suppress("NOTHING_TO_INLINE")
inline fun set(index: Int, value: Any?) {
acquirers[index].value = value
}

@Suppress("NOTHING_TO_INLINE")
inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)

Expand All @@ -210,19 +274,23 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)

// Cleans the acquirer slot located by the specified index
// and removes this segment physically if all slots are cleaned.
fun cancel(index: Int): Boolean {
// Try to cancel the slot
val cancelled = getAndSet(index, CANCELLED) !== RESUMED
fun cancel(index: Int) {
// Clean the slot
set(index, CANCELLED)
// Remove this segment if needed
onSlotCleaned()
return cancelled
}

override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
}

@SharedImmutable
private val RESUMED = Symbol("RESUMED")
private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
@SharedImmutable
private val PERMIT = Symbol("PERMIT")
@SharedImmutable
private val TAKEN = Symbol("TAKEN")
@SharedImmutable
private val BROKEN = Symbol("BROKEN")
@SharedImmutable
private val CANCELLED = Symbol("CANCELLED")
@SharedImmutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ internal class SegmentBasedQueue<T> {
val segmentOrClosed = this.head.findSegmentAndMoveForward(id = deqIdx, startFrom = curHead, createNewSegment = ::createSegment)
if (segmentOrClosed.isClosed) return null
val s = segmentOrClosed.segment
s.cleanPrev()
if (s.id > deqIdx) continue
var el = s.element.value
if (el === null) {
if (s.element.compareAndSet(null, BROKEN)) continue
else el = s.element.value
}
// The link to the previous segment should be cleaned after retrieving the element;
// otherwise, `close()` cannot clean the slot.
s.cleanPrev()
if (el === BROKEN) continue
@Suppress("UNCHECKED_CAST")
return el as T
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
package kotlinx.coroutines.linearizability

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.junit.*

class MutexLCStressTest : VerifierState() {
private val mutex = Mutex()

@Operation
fun tryLock() = mutex.tryLock()

@Operation
suspend fun lock() = mutex.lock()

@Operation(handleExceptionsAsResult = [IllegalStateException::class])
fun unlock() = mutex.unlock()

@Test
fun test() = LCStressOptionsDefault()
.actorsBefore(0)
.check(this::class)

override fun extractState() = mutex.isLocked
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
package kotlinx.coroutines.linearizability

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.junit.*

abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() {
private val semaphore = Semaphore(permits)

@Operation
fun tryAcquire() = semaphore.tryAcquire()

@Operation
suspend fun acquire() = semaphore.acquire()

@Operation(handleExceptionsAsResult = [IllegalStateException::class])
fun release() = semaphore.release()

@Test
fun test() = LCStressOptionsDefault()
.actorsBefore(0)
.check(this::class)

override fun extractState() = semaphore.availablePermits
}

class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1)
class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2)