Skip to content

Commit 9077b01

Browse files
ndkovalelizarov
authored andcommitted
Cancellation in Semaphore should resume the next waiting acquirer if and only if the canceled request has been resumed under a race
1 parent db0ef0c commit 9077b01

File tree

2 files changed

+53
-31
lines changed

2 files changed

+53
-31
lines changed

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+28-30
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,16 @@ private class SemaphoreImpl(
121121
}
122122

123123
override fun release() {
124-
val p = _availablePermits.getAndUpdate { cur ->
125-
check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
126-
cur + 1
127-
}
124+
val p = incPermits()
128125
if (p >= 0) return // no waiters
129126
resumeNextFromQueue()
130127
}
131128

129+
internal fun incPermits() = _availablePermits.getAndUpdate { cur ->
130+
check(cur < permits) { "The number of acquired permits cannot be greater than `permits`" }
131+
cur + 1
132+
}
133+
132134
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
133135
val last = this.tail
134136
val enqIdx = enqIdx.getAndIncrement()
@@ -143,63 +145,59 @@ private class SemaphoreImpl(
143145
}
144146

145147
@Suppress("UNCHECKED_CAST")
146-
private fun resumeNextFromQueue() {
147-
val first = this.head
148-
val deqIdx = deqIdx.getAndIncrement()
149-
val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: return
150-
val i = (deqIdx % SEGMENT_SIZE).toInt()
151-
val cont = segment.getAndUpdate(i) {
152-
// Cancelled continuation invokes `release`
153-
// and resumes next suspended acquirer if needed.
154-
if (it === CANCELLED) return
155-
RESUMED
148+
internal fun resumeNextFromQueue() {
149+
try_again@while (true) {
150+
val first = this.head
151+
val deqIdx = deqIdx.getAndIncrement()
152+
val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again
153+
val i = (deqIdx % SEGMENT_SIZE).toInt()
154+
val cont = segment.getAndSet(i, RESUMED)
155+
if (cont === null) return // just resumed
156+
if (cont === CANCELLED) continue@try_again
157+
(cont as CancellableContinuation<Unit>).resume(Unit)
158+
return
156159
}
157-
if (cont === null) return // just resumed
158-
(cont as CancellableContinuation<Unit>).resume(Unit)
159160
}
160161
}
161162

162163
private class CancelSemaphoreAcquisitionHandler(
163-
private val semaphore: Semaphore,
164+
private val semaphore: SemaphoreImpl,
164165
private val segment: SemaphoreSegment,
165166
private val index: Int
166167
) : CancelHandler() {
167168
override fun invoke(cause: Throwable?) {
168-
segment.cancel(index)
169-
semaphore.release()
169+
semaphore.incPermits()
170+
if (segment.cancel(index)) return
171+
semaphore.resumeNextFromQueue()
170172
}
171173

172174
override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
173175
}
174176

175177
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
176-
private val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
178+
val acquirers = atomicArrayOfNulls<Any?>(SEGMENT_SIZE)
177179

178180
@Suppress("NOTHING_TO_INLINE")
179181
inline fun get(index: Int): Any? = acquirers[index].value
180182

181183
@Suppress("NOTHING_TO_INLINE")
182184
inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
183185

184-
inline fun getAndUpdate(index: Int, function: (Any?) -> Any?): Any? {
185-
while (true) {
186-
val cur = acquirers[index].value
187-
val upd = function(cur)
188-
if (cas(index, cur, upd)) return cur
189-
}
190-
}
186+
@Suppress("NOTHING_TO_INLINE")
187+
inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
191188

192189
private val cancelledSlots = atomic(0)
193190
override val removed get() = cancelledSlots.value == SEGMENT_SIZE
194191

195192
// Cleans the acquirer slot located by the specified index
196193
// and removes this segment physically if all slots are cleaned.
197-
fun cancel(index: Int) {
198-
// Clean the specified waiter
199-
acquirers[index].value = CANCELLED
194+
fun cancel(index: Int): Boolean {
195+
// Try to cancel the slot
196+
val cancelled = getAndSet(index, CANCELLED) !== RESUMED
200197
// Remove this segment if needed
201198
if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
202199
remove()
200+
return cancelled
203201
}
204202

205203
override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"

kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt

+25-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class SemaphoreTest : TestBase() {
102102
}
103103

104104
@Test
105-
fun testCancellationReleasesSemaphore() = runTest {
105+
fun testCancellationReturnsPermitBack() = runTest {
106106
val semaphore = Semaphore(1)
107107
semaphore.acquire()
108108
assertEquals(0, semaphore.availablePermits)
@@ -116,4 +116,28 @@ class SemaphoreTest : TestBase() {
116116
semaphore.release()
117117
assertEquals(1, semaphore.availablePermits)
118118
}
119+
120+
@Test
121+
fun testCancellationDoesNotResumeWaitingAcquirers() = runTest {
122+
val semaphore = Semaphore(1)
123+
semaphore.acquire()
124+
val job1 = launch { // 1st job in the waiting queue
125+
expect(2)
126+
semaphore.acquire()
127+
expectUnreached()
128+
}
129+
val job2 = launch { // 2nd job in the waiting queue
130+
expect(3)
131+
semaphore.acquire()
132+
expectUnreached()
133+
}
134+
expect(1)
135+
yield()
136+
expect(4)
137+
job2.cancel()
138+
yield()
139+
expect(5)
140+
job1.cancel()
141+
finish(6)
142+
}
119143
}

0 commit comments

Comments
 (0)