Skip to content

Commit 288499e

Browse files
ndkovalqwwdfsad
authored andcommitted
Mutex and Semaphore review
1 parent d1c04b6 commit 288499e

File tree

8 files changed

+114
-32
lines changed

8 files changed

+114
-32
lines changed

kotlinx-coroutines-core/common/src/JobSupport.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -1205,7 +1205,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12051205
/**
12061206
* @suppress **This is unstable API and it is subject to change.**
12071207
*/
1208-
internal suspend fun awaitInternal(): Any? {
1208+
protected suspend fun awaitInternal(): Any? {
12091209
// fast-path -- check state (avoid extra object creation)
12101210
while (true) { // lock-free loop on state
12111211
val state = this.state
@@ -1235,9 +1235,8 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
12351235
cont.getResult()
12361236
}
12371237

1238-
12391238
@Suppress("UNCHECKED_CAST")
1240-
internal val onAwaitInternal: SelectClause1<*> get() = SelectClause1Impl<Any?>(
1239+
protected val onAwaitInternal: SelectClause1<*> get() = SelectClause1Impl<Any?>(
12411240
clauseObject = this@JobSupport,
12421241
regFunc = JobSupport::onAwaitInternalRegFunc as RegistrationFunction,
12431242
processResFunc = JobSupport::onAwaitInternalProcessResFunc as ProcessResultFunction

kotlinx-coroutines-core/common/src/internal/LocalAtomics.common.kt

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ internal expect class LocalAtomicInt(value: Int) {
1414
fun get(): Int
1515
fun set(value: Int)
1616
fun decrementAndGet(): Int
17+
fun incrementAndGet(): Int
1718
}
1819

1920
internal inline var LocalAtomicInt.value

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+32-13
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import kotlinx.coroutines.*
99
import kotlinx.coroutines.internal.*
1010
import kotlinx.coroutines.selects.*
1111
import kotlin.contracts.*
12+
import kotlin.jvm.*
1213
import kotlin.native.concurrent.*
1314

1415
/**
@@ -20,7 +21,7 @@ import kotlin.native.concurrent.*
2021
*
2122
* JVM API note:
2223
* Memory semantic of the [Mutex] is similar to `synchronized` block on JVM:
23-
* An unlock on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
24+
* An unlock operation on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
2425
* Unsuccessful call to [tryLock] do not have any memory effects.
2526
*/
2627
public interface Mutex {
@@ -171,9 +172,12 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
171172

172173
override fun tryLock(owner: Any?): Boolean =
173174
if (tryAcquire()) {
175+
assert { this.owner.value === NO_OWNER }
174176
this.owner.value = owner
175177
true
176-
} else false
178+
} else {
179+
false
180+
}
177181

178182
override fun unlock(owner: Any?) {
179183
while (true) {
@@ -192,7 +196,7 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
192196
}
193197
}
194198

195-
@Suppress("UNCHECKED_CAST")
199+
@Suppress("UNCHECKED_CAST", "OverridingDeprecatedMember")
196200
override val onLock: SelectClause2<Any?, Mutex> get() = SelectClause2Impl(
197201
clauseObject = this,
198202
regFunc = MutexImpl::onLockRegFunction as RegistrationFunction,
@@ -201,52 +205,67 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
201205
)
202206

203207
protected open fun onLockRegFunction(select: SelectInstance<*>, owner: Any?) {
204-
onAcquire.regFunc(this, SelectInstanceWithOwner(select, owner), owner)
208+
onAcquireRegFunction(SelectInstanceWithOwner(select, owner), owner)
205209
}
206210

207211
protected open fun onLockProcessResult(owner: Any?, result: Any?): Any? {
208-
onAcquire.processResFunc(this, null, result)
209212
return this
210213
}
211214

212-
protected val onCancellationUnlockConstructor: OnCancellationConstructor =
213-
{ select: SelectInstance<*>, owner: Any?, ignoredResult: Any? ->
215+
private val onCancellationUnlockConstructor: OnCancellationConstructor =
216+
{ _: SelectInstance<*>, owner: Any?, _: Any? ->
214217
{ unlock(owner) }
215218
}
216219

217220
private inner class CancellableContinuationWithOwner(
221+
@JvmField
218222
val cont: CancellableContinuation<Unit>,
223+
@JvmField
219224
val owner: Any?
220225
) : CancellableContinuation<Unit> by cont {
221226
override fun tryResume(value: Unit, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? {
222-
val token = cont.tryResume(value, idempotent, onCancellation)
223-
if (token !== null) this@MutexImpl.owner.value = owner
227+
assert { this@MutexImpl.owner.value === NO_OWNER }
228+
val token = cont.tryResume(value, idempotent) {
229+
assert { this@MutexImpl.owner.value.let { it === NO_OWNER ||it === owner } }
230+
this@MutexImpl.owner.value = owner
231+
unlock(owner)
232+
}
233+
if (token != null) {
234+
assert { this@MutexImpl.owner.value === NO_OWNER }
235+
this@MutexImpl.owner.value = owner
236+
}
224237
return token
225238
}
226239

227240
override fun resume(value: Unit, onCancellation: ((cause: Throwable) -> Unit)?) {
241+
assert { this@MutexImpl.owner.value === NO_OWNER }
228242
this@MutexImpl.owner.value = owner
229-
cont.resume(value, onCancellation)
243+
cont.resume(value) { unlock(owner) }
230244
}
231245
}
232246

233247
private inner class SelectInstanceWithOwner<Q>(
248+
@JvmField
234249
val select: SelectInstance<Q>,
250+
@JvmField
235251
val owner: Any?
236252
) : SelectInstance<Q> by select {
237253
override fun trySelect(clauseObject: Any, result: Any?): Boolean {
254+
assert { this@MutexImpl.owner.value === NO_OWNER }
255+
this@MutexImpl.owner.value = owner
238256
return select.trySelect(clauseObject, result).also { success ->
239-
if (success) this@MutexImpl.owner.value = owner
257+
if (!success) this@MutexImpl.owner.value = NO_OWNER
240258
}
241259
}
242260

243261
override fun selectInRegistrationPhase(internalResult: Any?) {
244-
select.selectInRegistrationPhase(internalResult)
262+
assert { this@MutexImpl.owner.value === NO_OWNER }
245263
this@MutexImpl.owner.value = owner
264+
select.selectInRegistrationPhase(internalResult)
246265
}
247266
}
248267

249-
override fun toString() = "Mutex@${hexAddress}"
268+
override fun toString() = "Mutex@${hexAddress}[isLocked=$isLocked,owner=${owner.value}]"
250269
}
251270

252271
@SharedImmutable

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+2-12
Original file line numberDiff line numberDiff line change
@@ -215,25 +215,15 @@ internal open class SemaphoreImpl(private val permits: Int, acquiredPermits: Int
215215
}
216216
}
217217

218-
val onAcquire: SelectClause1<Semaphore> get() = SelectClause1Impl(
219-
clauseObject = this,
220-
regFunc = SemaphoreImpl::onAcquireRegFunction as RegistrationFunction,
221-
processResFunc = SemaphoreImpl::onAcquireProcessResultFunction as ProcessResultFunction
222-
)
223-
218+
// We do not fully support `onAcquire` as it is needed only for `Mutex.onLock`.
224219
@Suppress("UNUSED_PARAMETER")
225-
private fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
220+
protected fun onAcquireRegFunction(select: SelectInstance<*>, ignoredParam: Any?) =
226221
acquire(
227222
waiter = select,
228223
suspend = { s -> addAcquireToQueue(s) },
229224
onAcquired = { s -> s.selectInRegistrationPhase(Unit) }
230225
)
231226

232-
@Suppress("UNUSED_PARAMETER", "RedundantNullableReturnType")
233-
private fun onAcquireProcessResultFunction(param: Any?, result: Any?): Any? {
234-
return this
235-
}
236-
237227
/**
238228
* Decrements the number of available permits
239229
* and ensures that it is not greater than [permits]

kotlinx-coroutines-core/js/src/internal/LocalAtomics.kt

+1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ internal actual class LocalAtomicInt actual constructor(private var value: Int)
1212
actual fun get(): Int = value
1313

1414
actual fun decrementAndGet(): Int = --value
15+
actual fun incrementAndGet(): Int = ++value
1516
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.internal.*
7+
import kotlinx.coroutines.selects.*
8+
import kotlinx.coroutines.sync.*
9+
import org.junit.*
10+
import java.util.concurrent.*
11+
12+
class MutexStressTest : TestBase() {
13+
@Test
14+
fun testStressCancellationDoesNotBreakMutex() = runTest {
15+
val mutex = Mutex()
16+
val mutexJobNumber = 3
17+
val mutexOwners = Array(mutexJobNumber) { "$it" }
18+
val dispatcher = Executors.newFixedThreadPool(mutexJobNumber + 2).asCoroutineDispatcher()
19+
var counter = 0
20+
val counterLocal = Array(mutexJobNumber) { LocalAtomicInt(0) }
21+
val completed = LocalAtomicInt(0)
22+
val mutexJobLauncher: (jobNumber: Int) -> Job = { jobId ->
23+
val coroutineName = "MutexJob-$jobId"
24+
launch(dispatcher + CoroutineName(coroutineName)) {
25+
while (completed.value == 0) {
26+
mutex.holdsLock(mutexOwners[(jobId + 1) % mutexJobNumber])
27+
if (mutex.tryLock(mutexOwners[jobId])) {
28+
counterLocal[jobId].incrementAndGet()
29+
counter++
30+
mutex.unlock(mutexOwners[jobId])
31+
}
32+
mutex.withLock(mutexOwners[jobId]) {
33+
counterLocal[jobId].incrementAndGet()
34+
counter++
35+
}
36+
select<Unit> {
37+
mutex.onLock(mutexOwners[jobId]) {
38+
counterLocal[jobId].incrementAndGet()
39+
counter++
40+
mutex.unlock(mutexOwners[jobId])
41+
}
42+
}
43+
}
44+
}
45+
}
46+
val mutexJobs = (0 until mutexJobNumber).map { mutexJobLauncher(it) }.toMutableList()
47+
val checkProgressJob = launch(dispatcher + CoroutineName("checkProgressJob")) {
48+
var lastCounterLocalSnapshot = (0 until mutexJobNumber).map { 0 }
49+
while (completed.value == 0) {
50+
delay(500)
51+
val c = counterLocal.map { it.value }
52+
for (i in 0 until mutexJobNumber) {
53+
assert(c[i] > lastCounterLocalSnapshot[i]) { "No progress in MutexJob-$i" }
54+
}
55+
lastCounterLocalSnapshot = c
56+
}
57+
}
58+
val cancellationJob = launch(dispatcher + CoroutineName("cancellationJob")) {
59+
var cancellingJobId = 0
60+
while (completed.value == 0) {
61+
val jobToCancel = mutexJobs.removeFirst()
62+
jobToCancel.cancelAndJoin()
63+
mutexJobs += mutexJobLauncher(cancellingJobId)
64+
cancellingJobId = (cancellingJobId + 1) % mutexJobNumber
65+
}
66+
}
67+
delay(2000L * stressTestMultiplier)
68+
completed.value = 1
69+
cancellationJob.join()
70+
mutexJobs.forEach { it.join() }
71+
checkProgressJob.join()
72+
check(counter == counterLocal.sumOf { it.value })
73+
dispatcher.close()
74+
}
75+
}

kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt

-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines.lincheck
66

77
import kotlinx.coroutines.*
8-
import kotlinx.coroutines.selects.*
98
import kotlinx.coroutines.sync.*
109
import org.jetbrains.kotlinx.lincheck.*
1110
import org.jetbrains.kotlinx.lincheck.annotations.Operation
@@ -20,9 +19,6 @@ abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest()
2019
@Operation(promptCancellation = true, allowExtraSuspension = true)
2120
suspend fun acquire() = semaphore.acquire()
2221

23-
@Operation(promptCancellation = true, allowExtraSuspension = true)
24-
suspend fun onAcquire(): Unit = select { semaphore.onAcquire {} }
25-
2622
@Operation(handleExceptionsAsResult = [IllegalStateException::class])
2723
fun release() = semaphore.release()
2824

kotlinx-coroutines-core/native/src/internal/LocalAtomics.kt

+1
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ internal actual class LocalAtomicInt actual constructor(value: Int) {
1717
actual fun get(): Int = iRef.value
1818

1919
actual fun decrementAndGet(): Int = iRef.decrementAndGet()
20+
actual fun incrementAndGet(): Int = iRef.incrementAndGet()
2021
}

0 commit comments

Comments
 (0)