Skip to content

Pr/2390 #2396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 18, 2020
Merged

Pr/2390 #2396

Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// try lock
val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
if (_state.compareAndSet(state, update)) { // locked
cont.resume(Unit)
// TODO implement functional type in LockCont as soon as we get rid of legacy JS
cont.resume(Unit) { unlock(owner) }
return@sc
}
}
Expand Down
7 changes: 3 additions & 4 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
if (addAcquireToQueue(cont)) return@sc
val p = _availablePermits.getAndDecrement()
if (p > 0) { // permit acquired
cont.resume(Unit)
cont.resume(Unit, onCancellationRelease)
return@sc
}
}
Expand Down Expand Up @@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
// On CAS failure -- the cell must be either PERMIT or BROKEN
// If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
// The following resume must always succeed, since continuation was not published yet and we don't have
// to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled
cont.resume(Unit)
/// This continuation is not yet published, but still can be cancelled via outer job
cont.resume(Unit, onCancellationRelease)
return true
}
assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/test/sync/MutexTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.sync

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.test.*

Expand Down Expand Up @@ -106,4 +107,4 @@ class MutexTest : TestBase() {
assertFalse(mutex.holdsLock(firstOwner))
assertFalse(mutex.holdsLock(secondOwner))
}
}
}
19 changes: 18 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() {
}
}
}
}

@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val mutex = Mutex()
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
mutex.lock()
mutex.unlock()
}
mutex.withLock {
job.cancel()
}
job.join()
assertFalse { mutex.isLocked }
}
}
}
20 changes: 19 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync

import kotlinx.coroutines.*
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.*

class SemaphoreStressTest : TestBase() {
@Test
Expand Down Expand Up @@ -90,4 +90,22 @@ class SemaphoreStressTest : TestBase() {
}
}
}

@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val semaphore = Semaphore(1)
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
semaphore.acquire()
semaphore.release()
}
semaphore.withPermit {
job.cancel()

}
job.join()
assertTrue { semaphore.availablePermits == 1 }
}
}
}