From 02dfe6b910e6b289bca66ce9f7f86735ecd2b2e5 Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Wed, 18 Sep 2019 23:21:56 +0300 Subject: [PATCH 1/8] Added semaphore ability to specify the number of permits to acquire and release --- .../common/src/sync/Semaphore.kt | 45 ++++++++++--------- .../common/test/sync/SemaphoreTest.kt | 23 ++++++++++ 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index a9df15cf49..9597a989f6 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -23,7 +23,7 @@ public interface Semaphore { public val availablePermits: Int /** - * Acquires a permit from this semaphore, suspending until one is available. + * Acquires the given number of permits from this semaphore, suspending until ones are available. * All suspending acquirers are processed in first-in-first-out (FIFO) order. * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this @@ -36,23 +36,28 @@ public interface Semaphore { * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically * check for cancellation in tight loops if needed. * - * Use [tryAcquire] to try acquire a permit of this semaphore without suspension. + * Use [tryAcquire] to try acquire the given number of permits of this semaphore without suspension. + * + * @param permits the number of permits to acquire */ - public suspend fun acquire() + public suspend fun acquire(permits: Int = 1) /** - * Tries to acquire a permit from this semaphore without suspension. + * Tries to acquire the given number of permits from this semaphore without suspension. * - * @return `true` if a permit was acquired, `false` otherwise. + * @param permits the number of permits to acquire + * @return `true` if all permits were acquired, `false` otherwise. */ - public fun tryAcquire(): Boolean + public fun tryAcquire(permits: Int = 1): Boolean /** - * Releases a permit, returning it into this semaphore. Resumes the first - * suspending acquirer if there is one at the point of invocation. - * Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. + * Releases the given number of permits, returning them into this semaphore. Resumes the first + * suspending acquirer if there is one at the point of invocation and the requested number of permits are available. + * + * @param permits the number of permits to release + * @throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. */ - public fun release() + public fun release(permits: Int = 1) } /** @@ -107,28 +112,28 @@ private class SemaphoreImpl( private val enqIdx = atomic(0L) private val deqIdx = atomic(0L) - override fun tryAcquire(): Boolean { + override fun tryAcquire(permits: Int): Boolean { _availablePermits.loop { p -> - if (p <= 0) return false - if (_availablePermits.compareAndSet(p, p - 1)) return true + if (p < permits) return false + if (_availablePermits.compareAndSet(p, p - permits)) return true } } - override suspend fun acquire() { - val p = _availablePermits.getAndDecrement() + override suspend fun acquire(permits: Int) { + val p = _availablePermits.getAndAdd(-permits) if (p > 0) return // permit acquired addToQueueAndSuspend() } - override fun release() { - val p = incPermits() + override fun release(permits: Int) { + val p = incPermits(permits) if (p >= 0) return // no waiters resumeNextFromQueue() } - fun incPermits() = _availablePermits.getAndUpdate { cur -> - check(cur < permits) { "The number of released permits cannot be greater than $permits" } - cur + 1 + fun incPermits(delta: Int = 1) = _availablePermits.getAndUpdate { cur -> + check(cur + delta <= permits) { "The number of released permits cannot be greater than $permits" } + cur + delta } private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine sc@ { cont -> diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt index b4ff88b895..0612412d66 100644 --- a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt @@ -150,6 +150,18 @@ class SemaphoreTest : TestBase() { assertTrue(semaphore.tryAcquire()) } + @Test + fun testMultipleAcquiredPermits() = runTest { + val semaphore = Semaphore(5, acquiredPermits = 1) + assertEquals(semaphore.availablePermits, 4) + semaphore.acquire(4) + assertEquals(semaphore.availablePermits, 0) + assertFalse(semaphore.tryAcquire()) + semaphore.release() + assertEquals(semaphore.availablePermits, 1) + assertTrue(semaphore.tryAcquire()) + } + @Test fun testReleaseAcquiredPermits() = runTest { val semaphore = Semaphore(5, acquiredPermits = 4) @@ -161,6 +173,17 @@ class SemaphoreTest : TestBase() { assertFalse(semaphore.tryAcquire()) } + @Test + fun testReleaseMultipleAcquiredPermits() = runTest { + val semaphore = Semaphore(5, acquiredPermits = 4) + assertEquals(semaphore.availablePermits, 1) + semaphore.release(2) + assertEquals(3, semaphore.availablePermits) + assertFailsWith { semaphore.release(3) } + repeat(3) { assertTrue(semaphore.tryAcquire()) } + assertFalse(semaphore.tryAcquire()) + } + @Test fun testIllegalArguments() { assertFailsWith { Semaphore(-1, 0) } From b6443444ddb05c426aa2075e87378f4768964332 Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Thu, 19 Sep 2019 00:16:23 +0300 Subject: [PATCH 2/8] Added semaphore arguments validation --- kotlinx-coroutines-core/common/src/sync/Semaphore.kt | 11 ++++++++++- .../common/test/sync/SemaphoreTest.kt | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 9597a989f6..77be5cee1b 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -39,6 +39,8 @@ public interface Semaphore { * Use [tryAcquire] to try acquire the given number of permits of this semaphore without suspension. * * @param permits the number of permits to acquire + * + * @throws [IllegalArgumentException] if [permits] is less than or equal to zero. */ public suspend fun acquire(permits: Int = 1) @@ -47,14 +49,18 @@ public interface Semaphore { * * @param permits the number of permits to acquire * @return `true` if all permits were acquired, `false` otherwise. + * + * @throws [IllegalArgumentException] if [permits] is less than or equal to zero. */ public fun tryAcquire(permits: Int = 1): Boolean /** * Releases the given number of permits, returning them into this semaphore. Resumes the first - * suspending acquirer if there is one at the point of invocation and the requested number of permits are available. + * suspending acquirer if there is one at the point of invocation and the requested number of permits is available. * * @param permits the number of permits to release + * + * @throws [IllegalArgumentException] if [permits] is less than or equal to zero. * @throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. */ public fun release(permits: Int = 1) @@ -113,6 +119,7 @@ private class SemaphoreImpl( private val deqIdx = atomic(0L) override fun tryAcquire(permits: Int): Boolean { + require(permits > 0) { "The number of acquired permits must be greater than 0" } _availablePermits.loop { p -> if (p < permits) return false if (_availablePermits.compareAndSet(p, p - permits)) return true @@ -120,12 +127,14 @@ private class SemaphoreImpl( } override suspend fun acquire(permits: Int) { + require(permits > 0) { "The number of acquired permits must be greater than 0" } val p = _availablePermits.getAndAdd(-permits) if (p > 0) return // permit acquired addToQueueAndSuspend() } override fun release(permits: Int) { + require(permits > 0) { "The number of released permits must be greater than 0" } val p = incPermits(permits) if (p >= 0) return // no waiters resumeNextFromQueue() diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt index 0612412d66..e959ceb15e 100644 --- a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt @@ -185,10 +185,14 @@ class SemaphoreTest : TestBase() { } @Test - fun testIllegalArguments() { + fun testIllegalArguments() = runTest { assertFailsWith { Semaphore(-1, 0) } assertFailsWith { Semaphore(0, 0) } assertFailsWith { Semaphore(1, -1) } assertFailsWith { Semaphore(1, 2) } + val semaphore = Semaphore(1) + assertFailsWith { semaphore.acquire(-1) } + assertFailsWith { semaphore.tryAcquire(-1) } + assertFailsWith { semaphore.release(-1) } } } \ No newline at end of file From 585e75c09ba973a872b7f1da90c32fc37e7c59b3 Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Thu, 19 Sep 2019 01:24:28 +0300 Subject: [PATCH 3/8] Fixed case when multiple releases should resume multiple acquirers --- .../common/src/sync/Semaphore.kt | 4 ++- .../common/test/sync/SemaphoreTest.kt | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 77be5cee1b..8794e78c2a 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -137,7 +137,9 @@ private class SemaphoreImpl( require(permits > 0) { "The number of released permits must be greater than 0" } val p = incPermits(permits) if (p >= 0) return // no waiters - resumeNextFromQueue() + repeat(permits) { + resumeNextFromQueue() + } } fun incPermits(delta: Int = 1) = _availablePermits.getAndUpdate { cur -> diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt index e959ceb15e..423ddc27a3 100644 --- a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt @@ -114,6 +114,34 @@ class SemaphoreTest : TestBase() { assertEquals(1, semaphore.availablePermits) } + @Test + fun testMultipleReleasesResumeMultipleAcquirers() = runTest { + val permits = 5 + val semaphore = Semaphore(permits) + semaphore.acquire(permits) + assertEquals(0, semaphore.availablePermits) + val jobs = mutableListOf>() + for (i in 0 until permits) { + jobs += async { + expect(2 + i) + assertFalse(semaphore.tryAcquire()) + semaphore.acquire() + expect(2 + permits + i) + return@async true + } + } + expect(1) + yield() + semaphore.release(permits) + jobs.forEach { + assertTrue(it.await()) + } + assertEquals(0, semaphore.availablePermits) + semaphore.release(permits) + assertEquals(permits, semaphore.availablePermits) + finish(1 + permits + permits + 1) // first + two iterations + last + } + @Test fun testCancellationDoesNotResumeWaitingAcquirers() = runTest { val semaphore = Semaphore(1) From 3d50ea4cf6708294d97b0766b77c1dea99d46722 Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Fri, 20 Sep 2019 21:15:34 +0300 Subject: [PATCH 4/8] Fixed the case when one release resumes an acquire even if permits are not enough --- .../common/src/sync/Semaphore.kt | 18 ++++++++++----- .../common/test/sync/SemaphoreTest.kt | 22 +++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 8794e78c2a..8d6c62b508 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -107,8 +107,8 @@ private class SemaphoreImpl( * and the maximum number of waiting acquirers cannot be greater than 2^31 in any * real application. */ - private val _availablePermits = atomic(permits - acquiredPermits) - override val availablePermits: Int get() = max(_availablePermits.value, 0) + private val permitsBalance = atomic(permits - acquiredPermits) + override val availablePermits: Int get() = max(permitsBalance.value, 0) // The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`; // each segment contains a fixed number of slots. To determine a slot for each enqueue @@ -118,11 +118,16 @@ private class SemaphoreImpl( private val enqIdx = atomic(0L) private val deqIdx = atomic(0L) + /** + * The remaining permits from release operations, which could not be spent, because the next slot was not defined + */ + internal val accumulator = atomic(0) + override fun tryAcquire(permits: Int): Boolean { require(permits > 0) { "The number of acquired permits must be greater than 0" } - _availablePermits.loop { p -> + permitsBalance.loop { p -> if (p < permits) return false - if (_availablePermits.compareAndSet(p, p - permits)) return true + if (permitsBalance.compareAndSet(p, p - permits)) return true } } @@ -142,12 +147,13 @@ private class SemaphoreImpl( } } - fun incPermits(delta: Int = 1) = _availablePermits.getAndUpdate { cur -> + internal fun incPermits(delta: Int = 1) = permitsBalance.getAndUpdate { cur -> + assert { delta >= 1 } check(cur + delta <= permits) { "The number of released permits cannot be greater than $permits" } cur + delta } - private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine sc@ { cont -> + private suspend fun tryToAddToQueue(permits: Int) = suspendAtomicCancellableCoroutine sc@{ cont -> val last = this.tail val enqIdx = enqIdx.getAndIncrement() val segment = getSegment(last, enqIdx / SEGMENT_SIZE) diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt index 423ddc27a3..bb17e1f9a7 100644 --- a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt @@ -142,6 +142,28 @@ class SemaphoreTest : TestBase() { finish(1 + permits + permits + 1) // first + two iterations + last } + @Test + fun testSingleReleaseDoesNotResumeMultipleAcquirers() = runTest { + val permits = 5 + val semaphore = Semaphore(permits, permits) + assertEquals(0, semaphore.availablePermits) + var criticalSection = false + val job = launch { + semaphore.acquire(permits) + criticalSection = true + } + assertEquals(0, semaphore.availablePermits) + repeat(permits - 1) { + semaphore.release() + assertEquals(false, criticalSection) + assertEquals(it + 1, semaphore.availablePermits) + } + semaphore.release() + job.join() + assertEquals(true, criticalSection) + assertEquals(0, semaphore.availablePermits) + } + @Test fun testCancellationDoesNotResumeWaitingAcquirers() = runTest { val semaphore = Semaphore(1) From c86260d2320958a51644d013c93643b38bc17e7b Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Fri, 20 Sep 2019 22:17:01 +0300 Subject: [PATCH 5/8] Fixed the case when one release resumes an acquire even if permits are not enough --- .../common/src/sync/Semaphore.kt | 205 +++++++++++++----- .../common/test/sync/SemaphoreTest.kt | 49 +++-- 2 files changed, 183 insertions(+), 71 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 8d6c62b508..350f5cf9ac 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -133,18 +133,16 @@ private class SemaphoreImpl( override suspend fun acquire(permits: Int) { require(permits > 0) { "The number of acquired permits must be greater than 0" } - val p = _availablePermits.getAndAdd(-permits) - if (p > 0) return // permit acquired - addToQueueAndSuspend() + val p = permitsBalance.getAndAdd(-permits) + if (p >= permits) return // permits are acquired + tryToAddToQueue(permits) } override fun release(permits: Int) { require(permits > 0) { "The number of released permits must be greater than 0" } val p = incPermits(permits) if (p >= 0) return // no waiters - repeat(permits) { - resumeNextFromQueue() - } + tryToResumeFromQueue(permits) } internal fun incPermits(delta: Int = 1) = permitsBalance.getAndUpdate { cur -> @@ -155,79 +153,170 @@ private class SemaphoreImpl( private suspend fun tryToAddToQueue(permits: Int) = suspendAtomicCancellableCoroutine sc@{ cont -> val last = this.tail - val enqIdx = enqIdx.getAndIncrement() - val segment = getSegment(last, enqIdx / SEGMENT_SIZE) - val i = (enqIdx % SEGMENT_SIZE).toInt() - if (segment === null || segment.get(i) === RESUMED || !segment.cas(i, null, cont)) { - // already resumed + val enqueueId = enqIdx.getAndIncrement() + val segmentId = enqueueId / SEGMENT_SIZE + val segment = getSegment(last, segmentId) + if (segment == null) { + // The segment is already removed + // Probably, this is the unreachable case cont.resume(Unit) - return@sc + } else { + val slotId = (enqueueId % SEGMENT_SIZE).toInt() + val prevSlot = segment.slots[slotId].getAndSet(Slot(State.SUSPEND, permits, cont)) + // The assertion is true, cause [RESUMED] can be set up only after [SUSPEND] + // and [CANCELLED] can be set up only in the handler, which will be added next + assert { prevSlot == null } + cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, slotId, permits).asHandler) } - cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, i).asHandler) + // Help to resume slots, if accumulator has permits + tryToResumeFromQueue(0) } - @Suppress("UNCHECKED_CAST") - internal fun resumeNextFromQueue() { - try_again@while (true) { - val first = this.head - val deqIdx = deqIdx.getAndIncrement() - val segment = getSegmentAndMoveHead(first, deqIdx / SEGMENT_SIZE) ?: continue@try_again - val i = (deqIdx % SEGMENT_SIZE).toInt() - val cont = segment.getAndSet(i, RESUMED) - if (cont === null) return // just resumed - if (cont === CANCELLED) continue@try_again - (cont as CancellableContinuation).resume(Unit) + internal fun tryToResumeFromQueue(permits: Int) { + accumulator.getAndAdd(permits) // add thread permits to common accumulator + var remain = accumulator.getAndSet(0) // try to take possession of all the accumulated permits at the moment + if (remain == 0) { + // another thread stole permits return } + try_again@ while (true) { + val first = this.head + val dequeueId = deqIdx.value + val segmentId = dequeueId / SEGMENT_SIZE + val segment = getSegmentAndMoveHead(first, segmentId) + if (segment == null) { + // The segment is already removed + // Try to help to increment [deqIdx] once, because multiple threads can increment the [deqIdx] in parallel otherwise + deqIdx.compareAndSet(dequeueId, dequeueId + 1) + continue@try_again + } + val slotId = (dequeueId % SEGMENT_SIZE).toInt() + val slot = segment.slots[slotId].value + if (slot == null) { + // If the slot is not defined yet we can't spent permits for it, so return [remain] to [accumulator] + accumulator.addAndGet(remain) + return + } + if (slot.state == State.CANCELLED) { + // The slot was cancelled in the another thread + // Try to help to increment [deqIdx] once, because multiple threads can increment the [deqIdx] in parallel otherwise + if (deqIdx.compareAndSet(dequeueId, dequeueId + 1)) { + removeSegmentIfNeeded(segment, dequeueId + 1) + } + continue@try_again + } + if (slot.state == State.RESUMED) { + assert { slot.permits == 0 } + // The slot was updated in the another thread + // The another thread was supposed to increment [deqIdx] + continue@try_again + } + val diff = min(slot.permits, remain) // How many permits we can spent for the slot at most + val newPermits = slot.permits - diff + val newState = if (newPermits == 0) State.RESUMED else slot.state + val newSlot = Slot(newState, newPermits, slot.cont) + if (!segment.slots[slotId].compareAndSet(slot, newSlot)) { + // The slot was updated in another thread, let's try again + continue + } + // Here we successfully updated the slot + remain -= diff // remove spent permits + if (newState == State.RESUMED) { + slot.cont.resume(Unit) + removeSegmentIfNeeded(segment, deqIdx.incrementAndGet()) + } + if (remain == 0) { + // We spent all available permits, so let's finish + return + } + // We still have permits, so we continue to spent them + } } + + /** + * Remove the segment if needed. The method checks, that all segment's slots were processed + * + * @param segment the segment to validation + * @param dequeueId the current dequeue operation ID + */ + internal fun removeSegmentIfNeeded(segment: SemaphoreSegment, dequeueId: Long) { + val slotId = (dequeueId % SEGMENT_SIZE).toInt() + if (slotId == SEGMENT_SIZE) { + segment.remove() + } + } + + override fun toString(): String { + return "Semaphore=(balance=${permitsBalance.value}, accumulator=${accumulator.value})" + } +} + +private enum class State { + SUSPEND, + RESUMED, + CANCELLED } +private data class Slot( + val state: State, + /** + * Remaining permits to resume slot + */ + val permits: Int, + val cont: CancellableContinuation +) { + init { + assert { permits >= 0 } + assert { permits != 0 || state == State.RESUMED } + } + + override fun toString(): String { + return "Slot($state, $permits)" + } +} + +/** + * Cleans the acquirer slot located by the specified index and removes this segment physically if all slots are cleaned. + */ private class CancelSemaphoreAcquisitionHandler( - private val semaphore: SemaphoreImpl, - private val segment: SemaphoreSegment, - private val index: Int + private val semaphore: SemaphoreImpl, + private val segment: SemaphoreSegment, + private val slotId: Int, + private val permits: Int ) : CancelHandler() { override fun invoke(cause: Throwable?) { - val p = semaphore.incPermits() + // Don't wait and use [prevSlot.permits] to handle permits, because it start races with release (see StressTest) + val p = semaphore.incPermits(permits) if (p >= 0) return - if (segment.cancel(index)) return - semaphore.resumeNextFromQueue() + // Copy [slotId] to local variable to prevent exception: + // "Complex data flow is not allowed for calculation of an array element index at the point of loading the reference to this element." + val temp = slotId + val prevSlot = segment.slots[temp].getAndUpdate { Slot(State.CANCELLED, it!!.permits, it.cont) } + // The assertion is true, cause the slot has [SUSPEND] state at least + assert { prevSlot != null } + + // Remove this segment if needed + if (segment.cancelledSlots.incrementAndGet() == SEGMENT_SIZE) { + segment.remove() + } + if (prevSlot!!.state == State.RESUMED) { + // The slot has already resumed, so return free permits to semaphore + semaphore.tryToResumeFromQueue(prevSlot.permits) + } } - override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $index]" + override fun toString() = "CancelSemaphoreAcquisitionHandler[$semaphore, $segment, $slotId]" } -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?): Segment(id, prev) { - val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) - private val cancelledSlots = atomic(0) +private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment(id, prev) { + val slots = atomicArrayOfNulls(SEGMENT_SIZE) + val cancelledSlots = atomic(0) override val removed get() = cancelledSlots.value == SEGMENT_SIZE - @Suppress("NOTHING_TO_INLINE") - inline fun get(index: Int): Any? = acquirers[index].value - - @Suppress("NOTHING_TO_INLINE") - inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) - - @Suppress("NOTHING_TO_INLINE") - inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) - - // Cleans the acquirer slot located by the specified index - // and removes this segment physically if all slots are cleaned. - fun cancel(index: Int): Boolean { - // Try to cancel the slot - val cancelled = getAndSet(index, CANCELLED) !== RESUMED - // Remove this segment if needed - if (cancelledSlots.incrementAndGet() == SEGMENT_SIZE) - remove() - return cancelled + override fun toString(): String { + return "SemaphoreSegment(id=$id)" } - - override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" } -@SharedImmutable -private val RESUMED = Symbol("RESUMED") -@SharedImmutable -private val CANCELLED = Symbol("CANCELLED") @SharedImmutable private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt index bb17e1f9a7..25f18b81f0 100644 --- a/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt @@ -144,36 +144,59 @@ class SemaphoreTest : TestBase() { @Test fun testSingleReleaseDoesNotResumeMultipleAcquirers() = runTest { + val acquires = 5 val permits = 5 val semaphore = Semaphore(permits, permits) assertEquals(0, semaphore.availablePermits) - var criticalSection = false - val job = launch { - semaphore.acquire(permits) - criticalSection = true + val criticalSections = Array(acquires) { false } + val jobs = mutableListOf() + repeat(acquires) { i -> + jobs += launch { + expect(2 + i) + semaphore.acquire(permits) + criticalSections[i] = true + expect(2 + i + acquires) + } } + expect(1) + yield() assertEquals(0, semaphore.availablePermits) - repeat(permits - 1) { + fun testFairness(i: Int) { + for (k in 0 until i) { + assertEquals(true, criticalSections[k]) + } + for (k in i + 1 until acquires) { + assertEquals(false, criticalSections[k]) + } + } + repeat(acquires) { i -> + repeat(permits - 1) { + semaphore.release() + testFairness(i) + assertEquals(0, semaphore.availablePermits) + } semaphore.release() - assertEquals(false, criticalSection) - assertEquals(it + 1, semaphore.availablePermits) + jobs[i].join() + testFairness(i) + assertEquals(0, semaphore.availablePermits) } - semaphore.release() - job.join() - assertEquals(true, criticalSection) - assertEquals(0, semaphore.availablePermits) + semaphore.release(permits) + assertEquals(permits, semaphore.availablePermits) + finish(1 + acquires + acquires + 1) } @Test fun testCancellationDoesNotResumeWaitingAcquirers() = runTest { val semaphore = Semaphore(1) semaphore.acquire() - val job1 = launch { // 1st job in the waiting queue + val job1 = launch { + // 1st job in the waiting queue expect(2) semaphore.acquire() expectUnreached() } - val job2 = launch { // 2nd job in the waiting queue + val job2 = launch { + // 2nd job in the waiting queue expect(3) semaphore.acquire() expectUnreached() From cbd5bf2e0b3445057559eafc3399922155deea23 Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Sat, 21 Sep 2019 22:26:01 +0300 Subject: [PATCH 6/8] Optimization: removed Slot instance creation and removed extra accumulator processing --- .../common/src/sync/Semaphore.kt | 83 ++++++++----------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 350f5cf9ac..2f7c9bfa7a 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -4,7 +4,6 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* -import kotlin.jvm.* import kotlin.math.* /** @@ -162,8 +161,11 @@ private class SemaphoreImpl( cont.resume(Unit) } else { val slotId = (enqueueId % SEGMENT_SIZE).toInt() - val prevSlot = segment.slots[slotId].getAndSet(Slot(State.SUSPEND, permits, cont)) - // The assertion is true, cause [RESUMED] can be set up only after [SUSPEND] + val prevCont = segment.continuations[slotId].getAndSet(cont) + // It is safe to set continuation, because this slot is not defined yet, so another threads can not use it + assert { prevCont == null } + val prevSlot = segment.slots[slotId].getAndSet(permits) + // The assertion is true, cause [RESUMED] can be set up only after [SUSPENDED] // and [CANCELLED] can be set up only in the handler, which will be added next assert { prevSlot == null } cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(this, segment, slotId, permits).asHandler) @@ -173,10 +175,10 @@ private class SemaphoreImpl( } internal fun tryToResumeFromQueue(permits: Int) { - accumulator.getAndAdd(permits) // add thread permits to common accumulator - var remain = accumulator.getAndSet(0) // try to take possession of all the accumulated permits at the moment + val accumulated = accumulator.getAndSet(0) // try to take possession of all the accumulated permits at the moment + var remain = permits + accumulated if (remain == 0) { - // another thread stole permits + // The accumulator had not any permits or the another thread stole permits. Also this method called with zero permits. return } try_again@ while (true) { @@ -197,7 +199,7 @@ private class SemaphoreImpl( accumulator.addAndGet(remain) return } - if (slot.state == State.CANCELLED) { + if (slot == CANCELLED) { // The slot was cancelled in the another thread // Try to help to increment [deqIdx] once, because multiple threads can increment the [deqIdx] in parallel otherwise if (deqIdx.compareAndSet(dequeueId, dequeueId + 1)) { @@ -205,24 +207,21 @@ private class SemaphoreImpl( } continue@try_again } - if (slot.state == State.RESUMED) { - assert { slot.permits == 0 } + if (slot == RESUMED) { // The slot was updated in the another thread // The another thread was supposed to increment [deqIdx] continue@try_again } - val diff = min(slot.permits, remain) // How many permits we can spent for the slot at most - val newPermits = slot.permits - diff - val newState = if (newPermits == 0) State.RESUMED else slot.state - val newSlot = Slot(newState, newPermits, slot.cont) + val diff = min(slot, remain) // How many permits we can spent for the slot at most + val newSlot = slot - diff if (!segment.slots[slotId].compareAndSet(slot, newSlot)) { // The slot was updated in another thread, let's try again - continue + continue@try_again } // Here we successfully updated the slot remain -= diff // remove spent permits - if (newState == State.RESUMED) { - slot.cont.resume(Unit) + if (newSlot == RESUMED) { + segment.continuations[slotId].value!!.resume(Unit) removeSegmentIfNeeded(segment, deqIdx.incrementAndGet()) } if (remain == 0) { @@ -251,30 +250,6 @@ private class SemaphoreImpl( } } -private enum class State { - SUSPEND, - RESUMED, - CANCELLED -} - -private data class Slot( - val state: State, - /** - * Remaining permits to resume slot - */ - val permits: Int, - val cont: CancellableContinuation -) { - init { - assert { permits >= 0 } - assert { permits != 0 || state == State.RESUMED } - } - - override fun toString(): String { - return "Slot($state, $permits)" - } -} - /** * Cleans the acquirer slot located by the specified index and removes this segment physically if all slots are cleaned. */ @@ -285,23 +260,23 @@ private class CancelSemaphoreAcquisitionHandler( private val permits: Int ) : CancelHandler() { override fun invoke(cause: Throwable?) { - // Don't wait and use [prevSlot.permits] to handle permits, because it start races with release (see StressTest) + // Don't wait and use [prevSlot] to handle permits, because it starts races with release (see StressTest) val p = semaphore.incPermits(permits) if (p >= 0) return // Copy [slotId] to local variable to prevent exception: // "Complex data flow is not allowed for calculation of an array element index at the point of loading the reference to this element." val temp = slotId - val prevSlot = segment.slots[temp].getAndUpdate { Slot(State.CANCELLED, it!!.permits, it.cont) } - // The assertion is true, cause the slot has [SUSPEND] state at least + val prevSlot = segment.slots[temp].getAndSet(CANCELLED) + // The assertion is true, cause the slot has [SUSPENDED] state at least assert { prevSlot != null } // Remove this segment if needed if (segment.cancelledSlots.incrementAndGet() == SEGMENT_SIZE) { segment.remove() } - if (prevSlot!!.state == State.RESUMED) { - // The slot has already resumed, so return free permits to semaphore - semaphore.tryToResumeFromQueue(prevSlot.permits) + if (prevSlot == RESUMED) { + // The slot has already resumed, so return free permits to the semaphore + semaphore.tryToResumeFromQueue(prevSlot) } } @@ -309,7 +284,15 @@ private class CancelSemaphoreAcquisitionHandler( } private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment(id, prev) { - val slots = atomicArrayOfNulls(SEGMENT_SIZE) + val continuations = atomicArrayOfNulls>(SEGMENT_SIZE) + /** + * Each slot can contain one of following values: + * 1. A number greater than zero. It is [SUSPENDED] state; + * 2. Zero. It is [RESUMED] state; + * 3. "-1". It is [CANCELLED] state; + * 4. "null". The slot is not defined yet. + */ + val slots = atomicArrayOfNulls(SEGMENT_SIZE) val cancelledSlots = atomic(0) override val removed get() = cancelledSlots.value == SEGMENT_SIZE @@ -319,4 +302,8 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?) : Segment Date: Sat, 21 Mar 2020 17:28:58 +0300 Subject: [PATCH 7/8] Reverted the default value of "kotlinx.coroutines.semaphore.segmentSize" --- kotlinx-coroutines-core/common/src/sync/Semaphore.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 2f7c9bfa7a..9469dc3e01 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -306,4 +306,4 @@ private val RESUMED = 0 @SharedImmutable private val CANCELLED = -1 @SharedImmutable -private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 1) \ No newline at end of file +private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16) \ No newline at end of file From ad4639c927f3873df4c81d787a0d5ccc6197114f Mon Sep 17 00:00:00 2001 From: Victor Khovanskiy Date: Sat, 21 Mar 2020 18:04:26 +0300 Subject: [PATCH 8/8] Fixed Semaphore binary compatibility --- .../api/kotlinx-coroutines-core.api | 9 ++++ .../common/src/sync/Semaphore.kt | 51 +++++++++++++++++-- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index a6e5fd513e..dcf80f9d6e 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1147,10 +1147,19 @@ public final class kotlinx/coroutines/sync/MutexKt { } public abstract interface class kotlinx/coroutines/sync/Semaphore { + public abstract fun acquire (ILkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getAvailablePermits ()I public abstract fun release ()V + public abstract fun release (I)V public abstract fun tryAcquire ()Z + public abstract fun tryAcquire (I)Z +} + +public final class kotlinx/coroutines/sync/Semaphore$DefaultImpls { + public static synthetic fun acquire$default (Lkotlinx/coroutines/sync/Semaphore;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static synthetic fun release$default (Lkotlinx/coroutines/sync/Semaphore;IILjava/lang/Object;)V + public static synthetic fun tryAcquire$default (Lkotlinx/coroutines/sync/Semaphore;IILjava/lang/Object;)Z } public final class kotlinx/coroutines/sync/SemaphoreKt { diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 5ab8608fb1..32342ac182 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -26,6 +26,24 @@ public interface Semaphore { */ public val availablePermits: Int + /** + * Acquires a permit from this semaphore, suspending until one is available. + * All suspending acquirers are processed in first-in-first-out (FIFO) order. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this + * function is suspended, this function immediately resumes with [CancellationException]. + * + * *Cancellation of suspended semaphore acquisition is atomic* -- when this function + * throws [CancellationException] it means that the semaphore was not acquired. + * + * Note, that this function does not check for cancellation when it does not suspend. + * Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically + * check for cancellation in tight loops if needed. + * + * Use [tryAcquire] to try acquire a permit of this semaphore without suspension. + */ + public suspend fun acquire() + /** * Acquires the given number of permits from this semaphore, suspending until ones are available. * All suspending acquirers are processed in first-in-first-out (FIFO) order. @@ -46,7 +64,14 @@ public interface Semaphore { * * @throws [IllegalArgumentException] if [permits] is less than or equal to zero. */ - public suspend fun acquire(permits: Int = 1) + public suspend fun acquire(permits: Int) + + /** + * Tries to acquire a permit from this semaphore without suspension. + * + * @return `true` if a permit was acquired, `false` otherwise. + */ + public fun tryAcquire(): Boolean /** * Tries to acquire the given number of permits from this semaphore without suspension. @@ -56,7 +81,15 @@ public interface Semaphore { * * @throws [IllegalArgumentException] if [permits] is less than or equal to zero. */ - public fun tryAcquire(permits: Int = 1): Boolean + public fun tryAcquire(permits: Int): Boolean + + /** + * Releases a permit, returning it into this semaphore. Resumes the first + * suspending acquirer if there is one at the point of invocation and the requested number of permits is available. + * + * @throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. + */ + public fun release() /** * Releases the given number of permits, returning them into this semaphore. Resumes the first @@ -67,7 +100,7 @@ public interface Semaphore { * @throws [IllegalArgumentException] if [permits] is less than or equal to zero. * @throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire]. */ - public fun release(permits: Int = 1) + public fun release(permits: Int) } /** @@ -127,6 +160,10 @@ private class SemaphoreImpl( */ internal val accumulator = atomic(0) + override fun tryAcquire(): Boolean { + return tryAcquire(1) + } + override fun tryAcquire(permits: Int): Boolean { require(permits > 0) { "The number of acquired permits must be greater than 0" } permitsBalance.loop { p -> @@ -135,6 +172,10 @@ private class SemaphoreImpl( } } + override suspend fun acquire() { + return acquire(1) + } + override suspend fun acquire(permits: Int) { require(permits > 0) { "The number of acquired permits must be greater than 0" } val p = permitsBalance.getAndAdd(-permits) @@ -142,6 +183,10 @@ private class SemaphoreImpl( tryToAddToQueue(permits) } + override fun release() { + release(1) + } + override fun release(permits: Int) { require(permits > 0) { "The number of released permits must be greater than 0" } val p = incPermits(permits)