Skip to content

Commit 7e762d3

Browse files
authored
Optimize mutex implementation (#2851)
* Get rid of addLastIf and DCSS primitive during contention * Leverage constants returned by tryResume* and simplify signatures
1 parent 8baa736 commit 7e762d3

File tree

2 files changed

+51
-23
lines changed

2 files changed

+51
-23
lines changed

kotlinx-coroutines-core/common/src/CancellableContinuation.kt

+4
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
8181
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
8282
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
8383
* guaranteed can be provided by having a cancellation fallback.
84+
*
85+
* Implementation note: current implementation always returns RESUME_TOKEN or `null`
86+
*
87+
* @suppress **This is unstable API and it is subject to change.**
8488
*/
8589
@InternalCoroutinesApi
8690
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+47-23
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.*
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlinx.coroutines.selects.*
1212
import kotlin.contracts.*
13-
import kotlin.coroutines.*
1413
import kotlin.jvm.*
1514
import kotlin.native.concurrent.*
1615

@@ -124,8 +123,6 @@ private val LOCK_FAIL = Symbol("LOCK_FAIL")
124123
@SharedImmutable
125124
private val UNLOCK_FAIL = Symbol("UNLOCK_FAIL")
126125
@SharedImmutable
127-
private val SELECT_SUCCESS = Symbol("SELECT_SUCCESS")
128-
@SharedImmutable
129126
private val LOCKED = Symbol("LOCKED")
130127
@SharedImmutable
131128
private val UNLOCKED = Symbol("UNLOCKED")
@@ -191,7 +188,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
191188
}
192189

193190
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
194-
val waiter = LockCont(owner, cont)
191+
var waiter = LockCont(owner, cont)
195192
_state.loop { state ->
196193
when (state) {
197194
is Empty -> {
@@ -210,11 +207,24 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
210207
is LockedQueue -> {
211208
val curOwner = state.owner
212209
check(curOwner !== owner) { "Already locked by $owner" }
213-
if (state.addLastIf(waiter) { _state.value === state }) {
214-
// added to waiter list!
210+
211+
state.addLast(waiter)
212+
/*
213+
* If the state has been changed while we were adding the waiter,
214+
* it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten.
215+
* To rendezvous that, we try to "invalidate" our node and go for retry.
216+
*
217+
* Node has to be re-instantiated as we do not support node re-adding, even to
218+
* another list
219+
*/
220+
if (_state.value === state || !waiter.take()) {
221+
// added to waiter list
215222
cont.removeOnCancellation(waiter)
216223
return@sc
217224
}
225+
226+
waiter = LockCont(owner, cont)
227+
return@loop
218228
}
219229
is OpDescriptor -> state.perform(this) // help
220230
else -> error("Illegal state $state")
@@ -252,8 +262,17 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
252262
is LockedQueue -> {
253263
check(state.owner !== owner) { "Already locked by $owner" }
254264
val node = LockSelect(owner, select, block)
255-
if (state.addLastIf(node) { _state.value === state }) {
256-
// successfully enqueued
265+
/*
266+
* If the state has been changed while we were adding the waiter,
267+
* it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten.
268+
* To rendezvous that, we try to "invalidate" our node and go for retry.
269+
*
270+
* Node has to be re-instantiated as we do not support node re-adding, even to
271+
* another list
272+
*/
273+
state.addLast(node)
274+
if (_state.value === state || !node.take()) {
275+
// added to waiter list
257276
select.disposeOnSelect(node)
258277
return
259278
}
@@ -300,7 +319,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
300319
}
301320
}
302321

303-
public override fun unlock(owner: Any?) {
322+
override fun unlock(owner: Any?) {
304323
_state.loop { state ->
305324
when (state) {
306325
is Empty -> {
@@ -319,10 +338,9 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
319338
val op = UnlockOp(state)
320339
if (_state.compareAndSet(state, op) && op.perform(this) == null) return
321340
} else {
322-
val token = (waiter as LockWaiter).tryResumeLockWaiter()
323-
if (token != null) {
341+
if ((waiter as LockWaiter).tryResumeLockWaiter()) {
324342
state.owner = waiter.owner ?: LOCKED
325-
waiter.completeResumeLockWaiter(token)
343+
waiter.completeResumeLockWaiter()
326344
return
327345
}
328346
}
@@ -352,31 +370,37 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
352370
private abstract inner class LockWaiter(
353371
@JvmField val owner: Any?
354372
) : LockFreeLinkedListNode(), DisposableHandle {
373+
private val isTaken = atomic<Boolean>(false)
374+
fun take(): Boolean = isTaken.compareAndSet(false, true)
355375
final override fun dispose() { remove() }
356-
abstract fun tryResumeLockWaiter(): Any?
357-
abstract fun completeResumeLockWaiter(token: Any)
376+
abstract fun tryResumeLockWaiter(): Boolean
377+
abstract fun completeResumeLockWaiter()
358378
}
359379

360380
private inner class LockCont(
361381
owner: Any?,
362-
@JvmField val cont: CancellableContinuation<Unit>
382+
private val cont: CancellableContinuation<Unit>
363383
) : LockWaiter(owner) {
364-
override fun tryResumeLockWaiter() = cont.tryResume(Unit, idempotent = null) {
365-
// if this continuation gets cancelled during dispatch to the caller, then release the lock
366-
unlock(owner)
384+
385+
override fun tryResumeLockWaiter(): Boolean {
386+
if (!take()) return false
387+
return cont.tryResume(Unit, idempotent = null) {
388+
// if this continuation gets cancelled during dispatch to the caller, then release the lock
389+
unlock(owner)
390+
} != null
367391
}
368-
override fun completeResumeLockWaiter(token: Any) = cont.completeResume(token)
369-
override fun toString(): String = "LockCont[$owner, $cont] for ${this@MutexImpl}"
392+
393+
override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN)
394+
override fun toString(): String = "LockCont[$owner, ${cont}] for ${this@MutexImpl}"
370395
}
371396

372397
private inner class LockSelect<R>(
373398
owner: Any?,
374399
@JvmField val select: SelectInstance<R>,
375400
@JvmField val block: suspend (Mutex) -> R
376401
) : LockWaiter(owner) {
377-
override fun tryResumeLockWaiter(): Any? = if (select.trySelect()) SELECT_SUCCESS else null
378-
override fun completeResumeLockWaiter(token: Any) {
379-
assert { token === SELECT_SUCCESS }
402+
override fun tryResumeLockWaiter(): Boolean = take() && select.trySelect()
403+
override fun completeResumeLockWaiter() {
380404
block.startCoroutineCancellable(receiver = this@MutexImpl, completion = select.completion) {
381405
// if this continuation gets cancelled during dispatch to the caller, then release the lock
382406
unlock(owner)

0 commit comments

Comments
 (0)