Skip to content

Commit 953da5c

Browse files
committed
Dirty fix + remove one of the semaphore linearizability tests
1 parent 0d9b789 commit 953da5c

File tree

2 files changed

+44
-43
lines changed

2 files changed

+44
-43
lines changed

kotlinx-coroutines-core/common/src/sync/Semaphore.kt

+41-39
Original file line numberDiff line numberDiff line change
@@ -118,19 +118,7 @@ private class SemaphoreImpl(
118118
if (p > 0) {
119119
// try to get a permit by updating the counter
120120
if (_availablePermits.compareAndSet(p, p - 1)) return true
121-
} else {
122-
// check whether there is a permit in the queue
123-
val enqIdx = this.enqIdx.value
124-
val deqIdx = this.deqIdx.value
125-
if (enqIdx >= deqIdx) return false
126-
// try to get a permit from the queue
127-
if (this.enqIdx.compareAndSet(enqIdx, enqIdx + 1)) {
128-
// update the counter of available permits and check
129-
// whether it was legal to get a permit from the queue
130-
if (_availablePermits.getAndDecrement() <= 0) return true
131-
else release()
132-
}
133-
}
121+
} else return false
134122
}
135123
}
136124

@@ -140,15 +128,19 @@ private class SemaphoreImpl(
140128
addToQueueAndSuspend()
141129
}
142130

143-
override fun release() {
144-
val p = incPermits()
145-
if (p >= 0) return // no waiters
146-
resumeNextFromQueue()
131+
override fun release() = check(releaseImpl()) {
132+
"The number of released permits cannot be greater than $permits"
147133
}
148134

149-
fun incPermits() = _availablePermits.getAndUpdate { cur ->
150-
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
151-
cur + 1
135+
internal fun releaseImpl(): Boolean {
136+
while (true) {
137+
val p = _availablePermits.getAndUpdate { cur ->
138+
if (cur == permits) return false // the number of released permits cannot be greater than `permits`
139+
cur + 1
140+
}
141+
if (p >= 0) return true
142+
if (tryResumeNextFromQueue()) return true
143+
}
152144
}
153145

154146
private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable<Unit> sc@ { cont ->
@@ -158,41 +150,45 @@ private class SemaphoreImpl(
158150
val i = (enqIdx % SEGMENT_SIZE).toInt()
159151
if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) {
160152
// already resumed
153+
segment?.set(i, TAKEN)
161154
cont.resume(Unit)
162155
return@sc
163156
}
164-
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler)
157+
cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
165158
}
166159

167160
@Suppress("UNCHECKED_CAST")
168-
internal fun resumeNextFromQueue() {
169-
try_again@while (true) {
161+
private fun tryResumeNextFromQueue(): Boolean {
170162
val first = this.head
171163
val deqIdx = deqIdx.getAndIncrement()
172-
val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again
164+
val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: return false
173165
val i = (deqIdx % SEGMENT_SIZE).toInt()
174166
val cont = segment.getAndSet(i, RESUMED)
175-
if (cont === null) return // just resumed
176-
if (cont === CANCELLED) continue@try_again
177-
(cont as CancellableContinuation<Unit>).resume(Unit)
178-
return
179-
}
167+
if (cont === CANCELLED) return false
168+
if (cont === null) {
169+
while (segment.get(i) != TAKEN) { /* spin wait */ }
170+
return true
171+
} // just resumed
172+
return (cont as CancellableContinuation<Unit>).tryResume()
180173
}
181174
}
182175

176+
private fun CancellableContinuation<Unit>.tryResume(): Boolean {
177+
val token = tryResume(Unit) ?: return false
178+
completeResume(token)
179+
return true
180+
}
181+
182+
183183
private class CancelSemaphoreAcquisitionHandler(
184-
private val semaphore: SemaphoreImpl,
185184
private val segment: SemaphoreSegment,
186185
private val index: Int
187186
) : CancelHandler() {
188187
override fun invoke(cause: Throwable?) {
189-
val p = semaphore.incPermits()
190-
if (p >= 0) return
191-
if (segment.cancel(index)) return
192-
semaphore.resumeNextFromQueue()
188+
segment.cancel(index)
193189
}
194190

195-
override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]"
191+
override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]"
196192
}
197193

198194
private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<SemaphoreSegment>(id, prev) {
@@ -203,6 +199,11 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap
203199
@Suppress("NOTHING_TO_INLINE")
204200
inline fun get(index: Int): Any? = acquirers[index].value
205201

202+
@Suppress("NOTHING_TO_INLINE")
203+
inline fun set(index: Int, value: Any?) {
204+
acquirers[index].value = value
205+
}
206+
206207
@Suppress("NOTHING_TO_INLINE")
207208
inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
208209

@@ -211,13 +212,12 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap
211212

212213
// Cleans the acquirer slot located by the specified index
213214
// and removes this segment physically if all slots are cleaned.
214-
fun cancel(index: Int): Boolean {
215-
// Try to cancel the slot
216-
val cancelled = getAndSet(index, CANCELLED) !== RESUMED
215+
fun cancel(index: Int) {
216+
// Clean the slot
217+
set(index, CANCELLED)
217218
// Remove this segment if needed
218219
if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE)
219220
remove()
220-
return cancelled
221221
}
222222

223223
override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
@@ -226,6 +226,8 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment<Semap
226226
@SharedImmutable
227227
private val RESUMED = Symbol("RESUMED")
228228
@SharedImmutable
229+
private val TAKEN = Symbol("TAKEN")
230+
@SharedImmutable
229231
private val CANCELLED = Symbol("CANCELLED")
230232
@SharedImmutable
231233
private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)

kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ import org.junit.*
1313
abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() {
1414
private val semaphore = Semaphore(permits)
1515

16-
// @Operation
16+
@Operation
1717
fun tryAcquire() = semaphore.tryAcquire()
1818

19-
@Operation(cancellableOnSuspension = true)
19+
@Operation
2020
suspend fun acquire() = semaphore.acquire()
2121

2222
@Operation(handleExceptionsAsResult = [IllegalStateException::class])
@@ -31,5 +31,4 @@ abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() {
3131
}
3232

3333
class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1)
34-
class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2)
35-
class Semaphore4LCStressTest : SemaphoreLCStressTestBase(4)
34+
class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2)

0 commit comments

Comments
 (0)