Skip to content

Commit 9a62f27

Browse files
dkhalanskyjbelizarov
authored andcommitted
Fix of a bug in the semaphore (#1477)
* Add a failing test for a semaphore This test consistently fails for the current implementation. It attempts to cause the following state: after `job2` increments `availablePermits` but before it wakes up the acquirer in the queue, the acquirer is cancelled. Then, regardless of whether `RESUMED` or `CANCELLED` was written first, another cell in the queue is marked to be resumed. However, this is incorrect: on cancellation, the acquirer incremented the number of available permits once more, making it `1`; thus, at the same time there exist two permits for acquiring the mutex. At the next loop iteration, a new acquirer tries to claim ownership of the mutex and succeeds because it goes to the thread queue and sees its cell as `RESUMED`. Thus, two entities own a mutex at the same time. * Fix a bug in semaphore implementation The fix works as follows: if `availablePermits` is negative, its absolute value denotes the logical length of the thread queue. Increasing its value if it was negative means that this thread promises to wake exactly one thread, and if its positive, returns one permit to the semaphore itself. Before, the error was in that a queue could be of negative length: if it consisted of only `N` cells, and `N` resume queries arrived, cancelling any threads would mean that there are more wakers then there are sleepers, which breaks the invariants of the semaphore. Thus, if on cancellation the acquirer detects that it leaves the queue empty in the presence of resumers, it simply transfers the semaphore acquisition permit to the semaphore itself, because it knows that it, in a sense, owns it already: there is a thread that is bound to resume this cell.
1 parent 57cc364 commit 9a62f27

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ private class CancelSemaphoreAcquisitionHandler(
166166
private val index: Int
167167
) : CancelHandler() {
168168
override fun invoke(cause: Throwable?) {
169-
semaphore.incPermits()
169+
val p = semaphore.incPermits()
170+
if (p >= 0) return
170171
if (segment.cancel(index)) return
171172
semaphore.resumeNextFromQueue()
172173
}

kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt

+37-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kotlinx.coroutines.sync
22

33
import kotlinx.coroutines.*
44
import org.junit.Test
5+
import org.junit.After
56
import kotlin.test.assertEquals
67

78
class SemaphoreStressTest : TestBase() {
@@ -57,4 +58,39 @@ class SemaphoreStressTest : TestBase() {
5758
semaphore.release()
5859
assertEquals(1, semaphore.availablePermits)
5960
}
60-
}
61+
62+
/**
63+
* This checks if repeated releases that race with cancellations put
64+
* the semaphore into an incorrect state where permits are leaked.
65+
*/
66+
@Test
67+
fun stressReleaseCancelRace() = runTest {
68+
val n = 10_000 * stressTestMultiplier
69+
val semaphore = Semaphore(1, 1)
70+
newSingleThreadContext("SemaphoreStressTest").use { pool ->
71+
repeat (n) {
72+
// Initially, we hold the permit and no one else can `acquire`,
73+
// otherwise it's a bug.
74+
assertEquals(0, semaphore.availablePermits)
75+
var job1_entered_critical_section = false
76+
val job1 = launch(start = CoroutineStart.UNDISPATCHED) {
77+
semaphore.acquire()
78+
job1_entered_critical_section = true
79+
semaphore.release()
80+
}
81+
// check that `job1` didn't finish the call to `acquire()`
82+
assertEquals(false, job1_entered_critical_section)
83+
val job2 = launch(pool) {
84+
semaphore.release()
85+
}
86+
// Because `job2` executes in a separate thread, this
87+
// cancellation races with the call to `release()`.
88+
job1.cancelAndJoin()
89+
job2.join()
90+
assertEquals(1, semaphore.availablePermits)
91+
semaphore.acquire()
92+
}
93+
}
94+
}
95+
96+
}

0 commit comments

Comments
 (0)