Skip to content

Add linearizability tests for Mutex and Semaphore and fix them #1898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ junit_version=4.12
atomicfu_version=0.14.2
knit_version=0.1.3
html_version=0.6.8
lincheck_version=2.5.3
lincheck_version=2.6
dokka_version=0.9.16-rdev-2-mpp-hacks
byte_buddy_version=1.10.9
reactor_vesion=3.2.5.RELEASE
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
enableAssertions = true
testLogging.showStandardStreams = true
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
}

task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
Expand Down
22 changes: 7 additions & 15 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -383,23 +383,15 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
) : OpDescriptor() {
override val atomicOp: AtomicOp<*>? get() = null

private val successful = atomic<Boolean?>(null)

override fun perform(affected: Any?): Any? {
/*
Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to
make a decision will reach it consistently. It does not matter what is a proposed
decision when this UnlockOp is no longer active, because in this case the following CAS
will fail anyway.
*/
val success = queue.isEmpty
val update: Any = if (success) EMPTY_UNLOCKED else queue
// Note: queue cannot change while this UnlockOp is in progress,
// so all concurrent attempts to make a decision will reach it consistently.
successful.compareAndSet(null, queue.isEmpty)
val update: Any = if (successful.value!!) EMPTY_UNLOCKED else queue
(affected as MutexImpl)._state.compareAndSet(this@UnlockOp, update)
/*
`perform` invocation from the original `unlock` invocation may be coming too late, when
some other thread had already helped to complete it (either successfully or not).
That operation was unsuccessful if `state` was restored to this `queue` reference and
that is what is being checked below.
*/
return if (affected._state.value === queue) UNLOCK_FAIL else null
return if (successful.value!!) null else UNLOCK_FAIL
}
}
}
109 changes: 61 additions & 48 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -121,76 +121,80 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
}

override suspend fun acquire() {
val p = _availablePermits.getAndDecrement()
if (p > 0) return // permit acquired
addToQueueAndSuspend()
while (true) {
val p = _availablePermits.getAndDecrement()
if (p > 0) return // permit acquired
if (addToQueueAndSuspend()) return
}
}

override fun release() {
val p = incPermits()
if (p >= 0) return // no waiters
resumeNextFromQueue()
}

fun incPermits() = _availablePermits.getAndUpdate { cur ->
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
while (true) {
val p = _availablePermits.getAndUpdate { cur ->
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
}
if (p >= 0) return
if (tryResumeNextFromQueue()) return
}
}

private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@{ cont ->
/**
* Returns `false` if the received permit cannot be used and the calling operation should restart.
*/
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Boolean> sc@{ cont ->
val curTail = this.tail.value
val enqIdx = enqIdx.getAndIncrement()
val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
createNewSegment = ::createSegment).run { segment } // cannot be closed
createNewSegment = ::createSegment).segment // cannot be closed
val i = (enqIdx % SEGMENT_SIZE).toInt()
if (segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
// already resumed
cont.resume(Unit)
if (segment.get(i) === PERMIT || !segment.cas(i, null, cont)) {
// The permit is already in the queue, try to grab it
cont.resume(segment.cas(i, PERMIT, TAKEN))
return@sc
}
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
}

@Suppress("UNCHECKED_CAST")
internal fun resumeNextFromQueue() {
try_again@ while (true) {
val curHead = this.head.value
val deqIdx = deqIdx.getAndIncrement()
val id = deqIdx / SEGMENT_SIZE
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
createNewSegment = ::createSegment).run { segment } // cannot be closed
segment.cleanPrev()
if (segment.id > id) {
this.deqIdx.updateIfLower(segment.id * SEGMENT_SIZE)
continue@try_again
private fun tryResumeNextFromQueue(): Boolean {
val curHead = this.head.value
val deqIdx = deqIdx.getAndIncrement()
val id = deqIdx / SEGMENT_SIZE
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
createNewSegment = ::createSegment).segment // cannot be closed
segment.cleanPrev()
if (segment.id > id) return false
val i = (deqIdx % SEGMENT_SIZE).toInt()
val cont = segment.getAndSet(i, PERMIT)
if (cont === CANCELLED) return false
if (cont === null) {
// Wait until an opposite operation comes for a bounded time
repeat(MAX_SPIN_CYCLES) {
if (segment.get(i) === TAKEN) return true
}
val i = (deqIdx % SEGMENT_SIZE).toInt()
val cont = segment.getAndSet(i, RESUMED)
if (cont === null) return // just resumed
if (cont === CANCELLED) continue@try_again
(cont as CancellableContinuation<Unit>).resume(Unit)
return
// Try to break the slot in order not to wait
return !segment.cas(i, PERMIT, BROKEN)
}
return (cont as CancellableContinuation<Boolean>).tryResume()
}
}

private inline fun AtomicLong.updateIfLower(value: Long): Unit = loop { cur ->
if (cur >= value || compareAndSet(cur, value)) return
private fun CancellableContinuation<Boolean>.tryResume(): Boolean {
val token = tryResume(true) ?: return false
completeResume(token)
return true
}

private class CancelSemaphoreAcquisitionHandler(
private val semaphore: SemaphoreImpl,
private val segment: SemaphoreSegment,
private val index: Int
) : CancelHandler() {
override fun invoke(cause: Throwable?) {
val p = semaphore.incPermits()
if (p >= 0) return
if (segment.cancel(index)) return
semaphore.resumeNextFromQueue()
segment.cancel(index)
}

override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]"
}

private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
Expand All @@ -202,6 +206,11 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)
@Suppress("NOTHING_TO_INLINE")
inline fun get(index: Int): Any? = acquirers[index].value

@Suppress("NOTHING_TO_INLINE")
inline fun set(index: Int, value: Any?) {
acquirers[index].value = value
}

@Suppress("NOTHING_TO_INLINE")
inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)

Expand All @@ -210,19 +219,23 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)

// Cleans the acquirer slot located by the specified index
// and removes this segment physically if all slots are cleaned.
fun cancel(index: Int): Boolean {
// Try to cancel the slot
val cancelled = getAndSet(index, CANCELLED) !== RESUMED
fun cancel(index: Int) {
// Clean the slot
set(index, CANCELLED)
// Remove this segment if needed
onSlotCleaned()
return cancelled
}

override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
}

@SharedImmutable
private val RESUMED = Symbol("RESUMED")
private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100_000)
@SharedImmutable
private val PERMIT = Symbol("PERMIT")
@SharedImmutable
private val TAKEN = Symbol("TAKEN")
@SharedImmutable
private val BROKEN = Symbol("TAKEN")
@SharedImmutable
private val CANCELLED = Symbol("CANCELLED")
@SharedImmutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ internal class SegmentBasedQueue<T> {
val segmentOrClosed = this.head.findSegmentAndMoveForward(id = deqIdx, startFrom = curHead, createNewSegment = ::createSegment)
if (segmentOrClosed.isClosed) return null
val s = segmentOrClosed.segment
s.cleanPrev()
if (s.id > deqIdx) continue
var el = s.element.value
if (el === null) {
if (s.element.compareAndSet(null, BROKEN)) continue
else el = s.element.value
}
// The link to the previous segment should be cleaned after retrieving the element;
// otherwise, `close()` cannot clean the slot.
s.cleanPrev()
if (el === BROKEN) continue
@Suppress("UNCHECKED_CAST")
return el as T
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
package kotlinx.coroutines.linearizability

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.junit.*

class MutexLCStressTest : VerifierState() {
private val mutex = Mutex()

@Operation
fun tryLock() = mutex.tryLock()

@Operation
suspend fun lock() = mutex.lock()

@Operation(handleExceptionsAsResult = [IllegalStateException::class])
fun unlock() = mutex.unlock()

@Test
fun test() = LCStressOptionsDefault()
.actorsBefore(0)
.check(this::class)

override fun extractState() = mutex.isLocked
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
package kotlinx.coroutines.linearizability

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.junit.*

abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() {
private val semaphore = Semaphore(permits)

@Operation
fun tryAcquire() = semaphore.tryAcquire()

@Operation
suspend fun acquire() = semaphore.acquire()

@Operation(handleExceptionsAsResult = [IllegalStateException::class])
fun release() = semaphore.release()

@Test
fun test() = LCStressOptionsDefault()
.actorsBefore(0)
.check(this::class)

override fun extractState() = semaphore.availablePermits
}

class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1)
class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2)