Skip to content

Commit 11c140a

Browse files
committed
Fixed StackOverflow on Mutex.unlock convoy for Unconfined coroutines
Fixes #80
1 parent cf8403c commit 11c140a

File tree

3 files changed

+87
-3
lines changed
  • kotlinx-coroutines-core/src

3 files changed

+87
-3
lines changed

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

+9
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ public interface Job : CoroutineContext.Element {
9696
*/
9797
@Deprecated("Replaced with top-level function", level = DeprecationLevel.HIDDEN)
9898
public operator fun invoke(parent: Job? = null): Job = Job(parent)
99+
100+
init {
101+
/*
102+
* Here we make sure that CoroutineExceptionHandler is always initialized in advance, so
103+
* that if a coroutine fails due to StackOverflowError we don't fail to report this error
104+
* trying to initialize CoroutineExceptionHandler
105+
*/
106+
CoroutineExceptionHandler
107+
}
99108
}
100109

101110
// ------------ state query ------------

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt

+61-2
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,19 @@ internal class MutexImpl(locked: Boolean) : Mutex {
136136
@Volatile
137137
private var _state: Any? = if (locked) EmptyLocked else EmptyUnlocked // shared objects while we have no waiters
138138

139+
// resumeNext is: RESUME_QUIESCENT | RESUME_ACTIVE | ResumeReq
140+
@Volatile
141+
private var resumeNext: Any = RESUME_QUIESCENT
142+
139143
private companion object {
140144
@JvmField
141145
val STATE: AtomicReferenceFieldUpdater<MutexImpl, Any?> =
142146
AtomicReferenceFieldUpdater.newUpdater(MutexImpl::class.java, Any::class.java, "_state")
143147

148+
@JvmField
149+
val RESUME_NEXT: AtomicReferenceFieldUpdater<MutexImpl, Any> =
150+
AtomicReferenceFieldUpdater.newUpdater(MutexImpl::class.java, Any::class.java, "resumeNext")
151+
144152
@JvmField
145153
val LOCK_FAIL = Symbol("LOCK_FAIL")
146154

@@ -165,6 +173,11 @@ internal class MutexImpl(locked: Boolean) : Mutex {
165173
@JvmField
166174
val EmptyUnlocked = Empty(UNLOCKED)
167175

176+
@JvmField
177+
val RESUME_QUIESCENT = Symbol("RESUME_QUIESCENT")
178+
179+
@JvmField
180+
val RESUME_ACTIVE = Symbol("RESUME_ACTIVE")
168181
}
169182

170183
public override val isLocked: Boolean get() {
@@ -347,9 +360,17 @@ internal class MutexImpl(locked: Boolean) : Mutex {
347360
if (STATE.compareAndSet(this, state, op) && op.perform(this) == null) return
348361
} else {
349362
val token = (waiter as LockWaiter).tryResumeLockWaiter()
350-
if (token != null) { // successfully resumed waiter that now is holding the lock
363+
if (token != null) {
364+
// successfully resumed waiter that now is holding the lock
365+
// we must immediately transfer ownership to the next waiter, because this coroutine
366+
// might try to lock it again after unlock returns do to StackOverflow avoidance code
367+
// and its attempts to take a lock must be queued.
351368
state.owner = waiter.owner ?: LOCKED
352-
waiter.completeResumeLockWaiter(token)
369+
// StackOverflow avoidance code
370+
if (startResumeNext(waiter, token)) {
371+
waiter.completeResumeLockWaiter(token)
372+
finishResumeNext()
373+
}
353374
return
354375
}
355376
}
@@ -359,6 +380,44 @@ internal class MutexImpl(locked: Boolean) : Mutex {
359380
}
360381
}
361382

383+
private class ResumeReq(
384+
@JvmField val waiter: LockWaiter,
385+
@JvmField val token: Any
386+
)
387+
388+
private fun startResumeNext(waiter: LockWaiter, token: Any): Boolean {
389+
while (true) { // lock-free loop on resumeNext
390+
val resumeNext = this.resumeNext
391+
when {
392+
resumeNext === RESUME_QUIESCENT -> {
393+
// this is never concurrent, because only one thread is holding mutex and trying to resume
394+
// next waiter, so no need to CAS here
395+
this.resumeNext = RESUME_ACTIVE
396+
return true
397+
}
398+
resumeNext === RESUME_ACTIVE ->
399+
if (RESUME_NEXT.compareAndSet(this, resumeNext, ResumeReq(waiter, token))) return false
400+
else -> error("Cannot happen")
401+
}
402+
}
403+
}
404+
405+
private fun finishResumeNext() {
406+
while (true) { // lock-free loop on resumeNext, also a resumption loop to fulfill requests of inner resume invokes
407+
val resumeNext = this.resumeNext
408+
when {
409+
resumeNext === RESUME_ACTIVE ->
410+
if (RESUME_NEXT.compareAndSet(this, resumeNext, RESUME_QUIESCENT)) return
411+
resumeNext is ResumeReq -> {
412+
// this is never concurrently, only one thread is finishing, so no need to CAS here
413+
this.resumeNext = RESUME_ACTIVE
414+
resumeNext.waiter.completeResumeLockWaiter(resumeNext.token)
415+
}
416+
else -> error("Cannot happen")
417+
}
418+
}
419+
}
420+
362421
override fun toString(): String {
363422
while (true) {
364423
val state = this._state

kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/sync/MutexTest.kt

+17-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental.sync
1818

19-
import guide.sync.example06.mutex
2019
import kotlinx.coroutines.experimental.*
20+
import org.hamcrest.core.IsEqual
2121
import org.junit.Assert.*
2222
import org.junit.Test
2323

@@ -91,4 +91,20 @@ class MutexTest : TestBase() {
9191
println("Shared value = $shared")
9292
assertEquals(n * k, shared)
9393
}
94+
95+
@Test
96+
fun testUnconfinedStackOverflow() {
97+
val waiters = 10000
98+
val mutex = Mutex(true)
99+
var done = 0
100+
repeat(waiters) {
101+
launch(Unconfined) { // a lot of unconfined waiters
102+
mutex.withLock {
103+
done++
104+
}
105+
}
106+
}
107+
mutex.unlock() // should not produce StackOverflowError
108+
assertThat(done, IsEqual(waiters))
109+
}
94110
}

0 commit comments

Comments
 (0)