Skip to content

Commit 999a304

Browse files
committed
Unlock Mutex and release Semaphore during cancellation on a fast branch of slow-path in Mutex/Semaphore
Fixes #2390
1 parent a5dfc23 commit 999a304

File tree

5 files changed

+42
-51
lines changed

5 files changed

+42
-51
lines changed

kotlinx-coroutines-core/common/src/sync/Mutex.kt

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
201201
// try lock
202202
val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
203203
if (_state.compareAndSet(state, update)) { // locked
204-
val token = cont.tryResume(Unit, idempotent = null) {
205-
// if this continuation gets cancelled during dispatch to the caller, then release
206-
// the lock
207-
unlock(owner)
208-
}
209-
if (token != null) {
210-
cont.completeResume(token)
211-
} else {
212-
// failure to get token implies already cancelled
213-
unlock(owner)
214-
}
204+
// TODO implement functional type in LockCont as soon as we get rid of legacy JS
205+
cont.resume(Unit) { unlock(owner) }
215206
return@sc
216207
}
217208
}

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
172172
if (addAcquireToQueue(cont)) return@sc
173173
val p = _availablePermits.getAndDecrement()
174174
if (p > 0) { // permit acquired
175-
cont.resume(Unit)
175+
cont.resume(Unit, onCancellationRelease)
176176
return@sc
177177
}
178178
}
@@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
206206
// On CAS failure -- the cell must be either PERMIT or BROKEN
207207
// If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
208208
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
209-
// The following resume must always succeed, since continuation was not published yet and we don't have
210-
// to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled
211-
cont.resume(Unit)
209+
/// This continuation is not yet published, but still can be cancelled via outer job
210+
cont.resume(Unit, onCancellationRelease)
212211
return true
213212
}
214213
assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it

kotlinx-coroutines-core/common/test/sync/MutexTest.kt

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ import kotlinx.coroutines.*
99
import kotlin.test.*
1010

1111
class MutexTest : TestBase() {
12-
private val enterCount = atomic(0)
13-
private val releasedCount = atomic(0)
14-
1512
@Test
1613
fun testSimple() = runTest {
1714
val mutex = Mutex()
@@ -110,35 +107,4 @@ class MutexTest : TestBase() {
110107
assertFalse(mutex.holdsLock(firstOwner))
111108
assertFalse(mutex.holdsLock(secondOwner))
112109
}
113-
114-
@Test
115-
fun cancelLock() = runTest() {
116-
val mutex = Mutex()
117-
enterCount.value = 0
118-
releasedCount.value = 0
119-
repeat(1000) {
120-
val job = launch(Dispatchers.Default) {
121-
val owner = Any()
122-
try {
123-
enterCount.incrementAndGet()
124-
mutex.withLock(owner) {}
125-
// repeat to give an increase in race probability
126-
mutex.withLock(owner) {}
127-
} finally {
128-
// should be no way lock is still held by owner here
129-
if (mutex.holdsLock(owner)) {
130-
// if it is held, ensure test case doesn't lockup
131-
mutex.unlock(owner)
132-
} else {
133-
releasedCount.incrementAndGet()
134-
}
135-
}
136-
}
137-
mutex.withLock {
138-
job.cancel()
139-
}
140-
job.join()
141-
}
142-
assertEquals(enterCount.value, releasedCount.value)
143-
}
144110
}

kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,21 @@ class MutexStressTest : TestBase() {
9090
}
9191
}
9292
}
93-
}
93+
94+
@Test
95+
fun testShouldBeUnlockedOnCancellation() = runTest {
96+
val mutex = Mutex()
97+
val n = 1000 * stressTestMultiplier
98+
repeat(n) {
99+
val job = launch(Dispatchers.Default) {
100+
mutex.lock()
101+
mutex.unlock()
102+
}
103+
mutex.withLock {
104+
job.cancel()
105+
}
106+
job.join()
107+
assertFalse { mutex.isLocked }
108+
}
109+
}
110+
}

kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package kotlinx.coroutines.sync
22

33
import kotlinx.coroutines.*
44
import org.junit.Test
5-
import kotlin.test.assertEquals
5+
import kotlin.test.*
66

77
class SemaphoreStressTest : TestBase() {
88
@Test
@@ -90,4 +90,22 @@ class SemaphoreStressTest : TestBase() {
9090
}
9191
}
9292
}
93+
94+
@Test
95+
fun testShouldBeUnlockedOnCancellation() = runTest {
96+
val semaphore = Semaphore(1)
97+
val n = 1000 * stressTestMultiplier
98+
repeat(n) {
99+
val job = launch(Dispatchers.Default) {
100+
semaphore.acquire()
101+
semaphore.release()
102+
}
103+
semaphore.withPermit {
104+
job.cancel()
105+
106+
}
107+
job.join()
108+
assertTrue { semaphore.availablePermits == 1 }
109+
}
110+
}
93111
}

0 commit comments

Comments
 (0)