Skip to content

Commit 8a0cfbe

Browse files
ndkovalelizarov
authored andcommitted
Add linearizability tests for Mutex and Semaphore and fix them (#1898)
* Make semaphore implementation linearizable (ignoring extra suspensions) * Make mutex implementation linearizable (ignoring extra suspensions) * Add linearizability tests for mutex and semaphore * Fix `SegmentQueueLCStressTest` Fixes #1737 Co-authored-by: Roman Elizarov <[email protected]>
1 parent 45487d1 commit 8a0cfbe

File tree

7 files changed

+192
-72
lines changed

7 files changed

+192
-72
lines changed

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ junit_version=4.12
1212
atomicfu_version=0.14.2
1313
knit_version=0.1.3
1414
html_version=0.6.8
15-
lincheck_version=2.5.3
15+
lincheck_version=2.6
1616
dokka_version=0.9.16-rdev-2-mpp-hacks
1717
byte_buddy_version=1.10.9
1818
reactor_vesion=3.2.5.RELEASE

kotlinx-coroutines-core/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) {
101101
enableAssertions = true
102102
testLogging.showStandardStreams = true
103103
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
104+
systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
105+
systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10'
104106
}
105107

106108
task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+7-20
Original file line numberDiff line numberDiff line change
@@ -380,26 +380,13 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
380380
// atomic unlock operation that checks that waiters queue is empty
381381
private class UnlockOp(
382382
@JvmField val queue: LockedQueue
383-
) : OpDescriptor() {
384-
override val atomicOp: AtomicOp<*>? get() = null
385-
386-
override fun perform(affected: Any?): Any? {
387-
/*
388-
Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to
389-
make a decision will reach it consistently. It does not matter what is a proposed
390-
decision when this UnlockOp is no longer active, because in this case the following CAS
391-
will fail anyway.
392-
*/
393-
val success = queue.isEmpty
394-
val update: Any = if (success) EMPTY_UNLOCKED else queue
395-
(affected as MutexImpl)._state.compareAndSet(this@UnlockOp, update)
396-
/*
397-
`perform` invocation from the original `unlock` invocation may be coming too late, when
398-
some other thread had already helped to complete it (either successfully or not).
399-
That operation was unsuccessful if `state` was restored to this `queue` reference and
400-
that is what is being checked below.
401-
*/
402-
return if (affected._state.value === queue) UNLOCK_FAIL else null
383+
) : AtomicOp<MutexImpl>() {
384+
override fun prepare(affected: MutexImpl): Any? =
385+
if (queue.isEmpty) null else UNLOCK_FAIL
386+
387+
override fun complete(affected: MutexImpl, failure: Any?) {
388+
val update: Any = if (failure == null) EMPTY_UNLOCKED else queue
389+
affected._state.compareAndSet(this, update)
403390
}
404391
}
405392
}

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+114-50
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,41 @@ public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
8484
}
8585

8686
private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
87+
/*
88+
The queue of waiting acquirers is essentially an infinite array based on the list of segments
89+
(see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
90+
and dequeue operation, we increment the corresponding counter at the beginning of the operation
91+
and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
92+
works with an individual cell. We use the corresponding segment pointers to find the required ones.
93+
94+
Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
95+
can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
96+
so that the state `PERMIT` represents different logical states.
97+
98+
+------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
99+
| NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
100+
+------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
101+
| |
102+
| | `acquire` request is cancelled and the continuation is
103+
| `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
104+
| to the slot before V
105+
| `acquire` and puts +-----------+ `release` has +--------+
106+
| a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
107+
| slot, waiting for +-----------+ failed +--------+
108+
| `acquire` after
109+
| that.
110+
|
111+
| `acquire` gets +-------+
112+
| +-----------------> | TAKEN | (ELIMINATION HAPPENED)
113+
V | the permit +-------+
114+
+--------+ |
115+
| PERMIT | -<
116+
+--------+ |
117+
| `release` has waited a bounded time, +--------+
118+
+---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
119+
but `acquire` has not come +--------+
120+
*/
87121

88-
// The queue of waiting acquirers is essentially an infinite array based on the list of segments
89-
// (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
90-
// and dequeue operation, we increment the corresponding counter at the beginning of the operation
91-
// and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
92-
// works with an individual cell.We use the corresponding segment pointer to find the required ones.
93122
private val head: AtomicRef<SemaphoreSegment>
94123
private val deqIdx = atomic(0L)
95124
private val tail: AtomicRef<SemaphoreSegment>
@@ -123,74 +152,100 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
123152
override suspend fun acquire() {
124153
val p = _availablePermits.getAndDecrement()
125154
if (p > 0) return // permit acquired
126-
addToQueueAndSuspend()
155+
// While it looks better when the following function is inlined,
156+
// it is important to make `suspend` function invocations in a way
157+
// so that the tail-call optimization can be applied.
158+
acquireSlowPath()
127159
}
128160

129-
override fun release() {
130-
val p = incPermits()
131-
if (p >= 0) return // no waiters
132-
resumeNextFromQueue()
161+
private suspend fun acquireSlowPath() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
162+
while (true) {
163+
if (addAcquireToQueue(cont)) return@sc
164+
val p = _availablePermits.getAndDecrement()
165+
if (p > 0) { // permit acquired
166+
cont.resume(Unit)
167+
return@sc
168+
}
169+
}
133170
}
134171

135-
fun incPermits() = _availablePermits.getAndUpdate { cur ->
136-
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
137-
cur + 1
172+
override fun release() {
173+
while (true) {
174+
val p = _availablePermits.getAndUpdate { cur ->
175+
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
176+
cur + 1
177+
}
178+
if (p >= 0) return
179+
if (tryResumeNextFromQueue()) return
180+
}
138181
}
139182

140-
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@{ cont ->
183+
/**
184+
* Returns `false` if the received permit cannot be used and the calling operation should restart.
185+
*/
186+
private fun addAcquireToQueue(cont: CancellableContinuation<Unit>): Boolean {
141187
val curTail = this.tail.value
142188
val enqIdx = enqIdx.getAndIncrement()
143189
val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
144-
createNewSegment = ::createSegment).run { segment } // cannot be closed
190+
createNewSegment = ::createSegment).segment // cannot be closed
145191
val i = (enqIdx % SEGMENT_SIZE).toInt()
146-
if (segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
147-
// already resumed
192+
// the regular (fast) path -- if the cell is empty, try to install continuation
193+
if (segment.cas(i, null, cont)) { // installed continuation successfully
194+
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
195+
return true
196+
}
197+
// On CAS failure -- the cell must be either PERMIT or BROKEN
198+
// If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
199+
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
148200
cont.resume(Unit)
149-
return@sc
201+
return true
150202
}
151-
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
203+
assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
204+
return false // broken cell, need to retry on a different cell
152205
}
153206

154207
@Suppress("UNCHECKED_CAST")
155-
internal fun resumeNextFromQueue() {
156-
try_again@ while (true) {
157-
val curHead = this.head.value
158-
val deqIdx = deqIdx.getAndIncrement()
159-
val id = deqIdx / SEGMENT_SIZE
160-
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
161-
createNewSegment = ::createSegment).run { segment } // cannot be closed
162-
segment.cleanPrev()
163-
if (segment.id > id) {
164-
this.deqIdx.updateIfLower(segment.id * SEGMENT_SIZE)
165-
continue@try_again
208+
private fun tryResumeNextFromQueue(): Boolean {
209+
val curHead = this.head.value
210+
val deqIdx = deqIdx.getAndIncrement()
211+
val id = deqIdx / SEGMENT_SIZE
212+
val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
213+
createNewSegment = ::createSegment).segment // cannot be closed
214+
segment.cleanPrev()
215+
if (segment.id > id) return false
216+
val i = (deqIdx % SEGMENT_SIZE).toInt()
217+
val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
218+
when {
219+
cellState === null -> {
220+
// Acquire has not touched this cell yet, wait until it comes for a bounded time
221+
// The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
222+
repeat(MAX_SPIN_CYCLES) {
223+
if (segment.get(i) === TAKEN) return true
224+
}
225+
// Try to break the slot in order not to wait
226+
return !segment.cas(i, PERMIT, BROKEN)
166227
}
167-
val i = (deqIdx % SEGMENT_SIZE).toInt()
168-
val cont = segment.getAndSet(i, RESUMED)
169-
if (cont === null) return // just resumed
170-
if (cont === CANCELLED) continue@try_again
171-
(cont as CancellableContinuation<Unit>).resume(Unit)
172-
return
228+
cellState === CANCELLED -> return false // the acquire was already cancelled
229+
else -> return (cellState as CancellableContinuation<Unit>).tryResume()
173230
}
174231
}
175232
}
176233

177-
private inline fun AtomicLong.updateIfLower(value: Long): Unit = loop { cur ->
178-
if (cur >= value || compareAndSet(cur, value)) return
234+
private fun CancellableContinuation<Unit>.tryResume(): Boolean {
235+
val token = tryResume(Unit) ?: return false
236+
completeResume(token)
237+
return true
179238
}
180239

181240
private class CancelSemaphoreAcquisitionHandler(
182-
private val semaphore: SemaphoreImpl,
183241
private val segment: SemaphoreSegment,
184242
private val index: Int
185243
) : CancelHandler() {
186244
override fun invoke(cause: Throwable?) {
187-
val p = semaphore.incPermits()
188-
if (p >= 0) return
189-
if (segment.cancel(index)) return
190-
semaphore.resumeNextFromQueue()
245+
segment.cancel(index)
191246
}
192247

193-
override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
248+
override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]"
194249
}
195250

196251
private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
@@ -202,6 +257,11 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)
202257
@Suppress("NOTHING_TO_INLINE")
203258
inline fun get(index: Int): Any? = acquirers[index].value
204259

260+
@Suppress("NOTHING_TO_INLINE")
261+
inline fun set(index: Int, value: Any?) {
262+
acquirers[index].value = value
263+
}
264+
205265
@Suppress("NOTHING_TO_INLINE")
206266
inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
207267

@@ -210,19 +270,23 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int)
210270

211271
// Cleans the acquirer slot located by the specified index
212272
// and removes this segment physically if all slots are cleaned.
213-
fun cancel(index: Int): Boolean {
214-
// Try to cancel the slot
215-
val cancelled = getAndSet(index, CANCELLED) !== RESUMED
273+
fun cancel(index: Int) {
274+
// Clean the slot
275+
set(index, CANCELLED)
216276
// Remove this segment if needed
217277
onSlotCleaned()
218-
return cancelled
219278
}
220279

221280
override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
222281
}
223-
224282
@SharedImmutable
225-
private val RESUMED = Symbol("RESUMED")
283+
private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
284+
@SharedImmutable
285+
private val PERMIT = Symbol("PERMIT")
286+
@SharedImmutable
287+
private val TAKEN = Symbol("TAKEN")
288+
@SharedImmutable
289+
private val BROKEN = Symbol("BROKEN")
226290
@SharedImmutable
227291
private val CANCELLED = Symbol("CANCELLED")
228292
@SharedImmutable

kotlinx-coroutines-core/jvm/test/internal/SegmentBasedQueue.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ internal class SegmentBasedQueue<T> {
5151
val segmentOrClosed = this.head.findSegmentAndMoveForward(id = deqIdx, startFrom = curHead, createNewSegment = ::createSegment)
5252
if (segmentOrClosed.isClosed) return null
5353
val s = segmentOrClosed.segment
54-
s.cleanPrev()
5554
if (s.id > deqIdx) continue
5655
var el = s.element.value
5756
if (el === null) {
5857
if (s.element.compareAndSet(null, BROKEN)) continue
5958
else el = s.element.value
6059
}
60+
// The link to the previous segment should be cleaned after retrieving the element;
61+
// otherwise, `close()` cannot clean the slot.
62+
s.cleanPrev()
6163
if (el === BROKEN) continue
6264
@Suppress("UNCHECKED_CAST")
6365
return el as T
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
@file:Suppress("unused")
5+
package kotlinx.coroutines.linearizability
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.sync.*
9+
import org.jetbrains.kotlinx.lincheck.annotations.Operation
10+
import org.jetbrains.kotlinx.lincheck.verifier.*
11+
import org.junit.*
12+
13+
class MutexLCStressTest : VerifierState() {
14+
private val mutex = Mutex()
15+
16+
@Operation
17+
fun tryLock() = mutex.tryLock()
18+
19+
@Operation
20+
suspend fun lock() = mutex.lock()
21+
22+
@Operation(handleExceptionsAsResult = [IllegalStateException::class])
23+
fun unlock() = mutex.unlock()
24+
25+
@Test
26+
fun test() = LCStressOptionsDefault()
27+
.actorsBefore(0)
28+
.check(this::class)
29+
30+
override fun extractState() = mutex.isLocked
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
@file:Suppress("unused")
5+
package kotlinx.coroutines.linearizability
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.sync.*
9+
import org.jetbrains.kotlinx.lincheck.annotations.Operation
10+
import org.jetbrains.kotlinx.lincheck.verifier.*
11+
import org.junit.*
12+
13+
abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() {
14+
private val semaphore = Semaphore(permits)
15+
16+
@Operation
17+
fun tryAcquire() = semaphore.tryAcquire()
18+
19+
@Operation
20+
suspend fun acquire() = semaphore.acquire()
21+
22+
@Operation(handleExceptionsAsResult = [IllegalStateException::class])
23+
fun release() = semaphore.release()
24+
25+
@Test
26+
fun test() = LCStressOptionsDefault()
27+
.actorsBefore(0)
28+
.check(this::class)
29+
30+
override fun extractState() = semaphore.availablePermits
31+
}
32+
33+
class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1)
34+
class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2)

0 commit comments

Comments
 (0)