Skip to content

Ability to specify the number of permits to acquire and release #1553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
72 changes: 47 additions & 25 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface Semaphore {
public val availablePermits: Int

/**
* Acquires a permit from this semaphore, suspending until one is available.
* Acquires the given number of permits from this semaphore, suspending until ones are available.
* All suspending acquirers are processed in first-in-first-out (FIFO) order.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
Expand All @@ -36,23 +36,34 @@ public interface Semaphore {
* Use [CoroutineScope.isActive] or [CoroutineScope.ensureActive] to periodically
* check for cancellation in tight loops if needed.
*
* Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
* Use [tryAcquire] to try acquire the given number of permits of this semaphore without suspension.
*
* @param permits the number of permits to acquire
*
* @throws [IllegalArgumentException] if [permits] is less than or equal to zero.
*/
public suspend fun acquire()
public suspend fun acquire(permits: Int = 1)

/**
* Tries to acquire a permit from this semaphore without suspension.
* Tries to acquire the given number of permits from this semaphore without suspension.
*
* @param permits the number of permits to acquire
* @return `true` if all permits were acquired, `false` otherwise.
*
* @return `true` if a permit was acquired, `false` otherwise.
* @throws [IllegalArgumentException] if [permits] is less than or equal to zero.
*/
public fun tryAcquire(): Boolean
public fun tryAcquire(permits: Int = 1): Boolean

/**
* Releases a permit, returning it into this semaphore. Resumes the first
* suspending acquirer if there is one at the point of invocation.
* Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
* Releases the given number of permits, returning them into this semaphore. Resumes the first
* suspending acquirer if there is one at the point of invocation and the requested number of permits is available.
*
* @param permits the number of permits to release
*
* @throws [IllegalArgumentException] if [permits] is less than or equal to zero.
* @throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
*/
public fun release()
public fun release(permits: Int = 1)
}

/**
Expand Down Expand Up @@ -96,8 +107,8 @@ private class SemaphoreImpl(
* and the maximum number of waiting acquirers cannot be greater than 2^31 in any
* real application.
*/
private val _availablePermits = atomic(permits - acquiredPermits)
override val availablePermits: Int get() = max(_availablePermits.value, 0)
private val permitsBalance = atomic(permits - acquiredPermits)
override val availablePermits: Int get() = max(permitsBalance.value, 0)

// The queue of waiting acquirers is essentially an infinite array based on `SegmentQueue`;
// each segment contains a fixed number of slots. To determine a slot for each enqueue
Expand All @@ -107,31 +118,42 @@ private class SemaphoreImpl(
private val enqIdx = atomic(0L)
private val deqIdx = atomic(0L)

override fun tryAcquire(): Boolean {
_availablePermits.loop { p ->
if (p <= 0) return false
if (_availablePermits.compareAndSet(p, p - 1)) return true
/**
* The remaining permits from release operations, which could not be spent, because the next slot was not defined
*/
internal val accumulator = atomic(0)

override fun tryAcquire(permits: Int): Boolean {
require(permits > 0) { "The number of acquired permits must be greater than 0" }
permitsBalance.loop { p ->
if (p < permits) return false
if (permitsBalance.compareAndSet(p, p - permits)) return true
}
}

override suspend fun acquire() {
val p = _availablePermits.getAndDecrement()
override suspend fun acquire(permits: Int) {
require(permits > 0) { "The number of acquired permits must be greater than 0" }
val p = _availablePermits.getAndAdd(-permits)
if (p > 0) return // permit acquired
addToQueueAndSuspend()
}

override fun release() {
val p = incPermits()
override fun release(permits: Int) {
require(permits > 0) { "The number of released permits must be greater than 0" }
val p = incPermits(permits)
if (p >= 0) return // no waiters
resumeNextFromQueue()
repeat(permits) {
resumeNextFromQueue()
}
}

fun incPermits() = _availablePermits.getAndUpdate { cur ->
check(cur < permits) { "The number of released permits cannot be greater than $permits" }
cur + 1
internal fun incPermits(delta: Int = 1) = permitsBalance.getAndUpdate { cur ->
assert { delta >= 1 }
check(cur + delta <= permits) { "The number of released permits cannot be greater than $permits" }
cur + delta
}

private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine<Unit> sc@ { cont ->
private suspend fun tryToAddToQueue(permits: Int) = suspendAtomicCancellableCoroutine<Unit> sc@{ cont ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how permits is used by this method. I'm very surprised that tests pass. Seems like some tests are missing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous commit didn't contain significant changes, sorry. Please, recheck.

val last = this.tail
val enqIdx = enqIdx.getAndIncrement()
val segment = getSegment(last, enqIdx / SEGMENT_SIZE)
Expand Down
79 changes: 78 additions & 1 deletion kotlinx-coroutines-core/common/test/sync/SemaphoreTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,56 @@ class SemaphoreTest : TestBase() {
assertEquals(1, semaphore.availablePermits)
}

@Test
fun testMultipleReleasesResumeMultipleAcquirers() = runTest {
val permits = 5
val semaphore = Semaphore(permits)
semaphore.acquire(permits)
assertEquals(0, semaphore.availablePermits)
val jobs = mutableListOf<Deferred<Boolean>>()
for (i in 0 until permits) {
jobs += async {
expect(2 + i)
assertFalse(semaphore.tryAcquire())
semaphore.acquire()
expect(2 + permits + i)
return@async true
}
}
expect(1)
yield()
semaphore.release(permits)
jobs.forEach {
assertTrue(it.await())
}
assertEquals(0, semaphore.availablePermits)
semaphore.release(permits)
assertEquals(permits, semaphore.availablePermits)
finish(1 + permits + permits + 1) // first + two iterations + last
}

@Test
fun testSingleReleaseDoesNotResumeMultipleAcquirers() = runTest {
val permits = 5
val semaphore = Semaphore(permits, permits)
assertEquals(0, semaphore.availablePermits)
var criticalSection = false
val job = launch {
semaphore.acquire(permits)
criticalSection = true
}
assertEquals(0, semaphore.availablePermits)
repeat(permits - 1) {
semaphore.release()
assertEquals(false, criticalSection)
assertEquals(it + 1, semaphore.availablePermits)
}
semaphore.release()
job.join()
assertEquals(true, criticalSection)
assertEquals(0, semaphore.availablePermits)
}

@Test
fun testCancellationDoesNotResumeWaitingAcquirers() = runTest {
val semaphore = Semaphore(1)
Expand Down Expand Up @@ -150,6 +200,18 @@ class SemaphoreTest : TestBase() {
assertTrue(semaphore.tryAcquire())
}

@Test
fun testMultipleAcquiredPermits() = runTest {
val semaphore = Semaphore(5, acquiredPermits = 1)
assertEquals(semaphore.availablePermits, 4)
semaphore.acquire(4)
assertEquals(semaphore.availablePermits, 0)
assertFalse(semaphore.tryAcquire())
semaphore.release()
assertEquals(semaphore.availablePermits, 1)
assertTrue(semaphore.tryAcquire())
}

@Test
fun testReleaseAcquiredPermits() = runTest {
val semaphore = Semaphore(5, acquiredPermits = 4)
Expand All @@ -162,10 +224,25 @@ class SemaphoreTest : TestBase() {
}

@Test
fun testIllegalArguments() {
fun testReleaseMultipleAcquiredPermits() = runTest {
val semaphore = Semaphore(5, acquiredPermits = 4)
assertEquals(semaphore.availablePermits, 1)
semaphore.release(2)
assertEquals(3, semaphore.availablePermits)
assertFailsWith<IllegalStateException> { semaphore.release(3) }
repeat(3) { assertTrue(semaphore.tryAcquire()) }
assertFalse(semaphore.tryAcquire())
}

@Test
fun testIllegalArguments() = runTest {
assertFailsWith<IllegalArgumentException> { Semaphore(-1, 0) }
assertFailsWith<IllegalArgumentException> { Semaphore(0, 0) }
assertFailsWith<IllegalArgumentException> { Semaphore(1, -1) }
assertFailsWith<IllegalArgumentException> { Semaphore(1, 2) }
val semaphore = Semaphore(1)
assertFailsWith<IllegalArgumentException> { semaphore.acquire(-1) }
assertFailsWith<IllegalArgumentException> { semaphore.tryAcquire(-1) }
assertFailsWith<IllegalArgumentException> { semaphore.release(-1) }
}
}