Skip to content

Commit 275a0ad

Browse files
committed
Implement permit release also for LimitedDispatcher
PermitTransfer is extracted to be used both in CoroutineScheduler and in LimitedDispatcher. BlockingDispatchAware interface is introduced for LimitedDispatcher.Worker to be accounted by CoroutineScheduler. Kotlin#3983 / IJPL-721
1 parent f54d9d0 commit 275a0ad

File tree

9 files changed

+184
-33
lines changed

9 files changed

+184
-33
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package kotlinx.coroutines.internal
2+
3+
4+
internal interface BlockingDispatchAware {
5+
fun beforeDispatchElsewhere()
6+
fun afterDispatchBack()
7+
}

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+25-1
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,14 @@ internal class LimitedDispatcher(
103103
* actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
104104
* perform any more dispatches.
105105
*/
106-
private inner class Worker(private var currentTask: Runnable) : Runnable {
106+
private inner class Worker(private var currentTask: Runnable) : Runnable, BlockingDispatchAware {
107107
override fun run() {
108108
var fairnessCounter = 0
109109
while (true) {
110110
try {
111111
currentTask.run()
112+
} catch (e: WorkerPermitTransferCompleted) {
113+
return
112114
} catch (e: Throwable) {
113115
handleCoroutineException(EmptyCoroutineContext, e)
114116
}
@@ -122,7 +124,29 @@ internal class LimitedDispatcher(
122124
}
123125
}
124126
}
127+
128+
override fun beforeDispatchElsewhere() {
129+
// compensate while we are blocked
130+
val newWorker = Worker(Runnable {})
131+
dispatcher.dispatch(this@LimitedDispatcher, newWorker)
132+
}
133+
134+
override fun afterDispatchBack() {
135+
if (tryAllocateWorker()) return
136+
val permitTransfer = PermitTransfer()
137+
queue.addLast(
138+
permitTransfer.releaseFun { throw WorkerPermitTransferCompleted }
139+
.let { Runnable { it() } }
140+
)
141+
permitTransfer.acquire(
142+
tryAllocatePermit = ::tryAllocateWorker,
143+
deallocatePermit = { runningWorkers.decrementAndGet() }
144+
)
145+
}
146+
125147
}
148+
149+
private object WorkerPermitTransferCompleted : Throwable()
126150
}
127151

128152
// Save a few bytecode ops
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.atomicfu.*
4+
import kotlin.jvm.*
5+
6+
@JvmField
7+
internal val PERMIT_ACQUIRE_PARK_NS = systemProp(
8+
"kotlinx.coroutines.permit.acquire.ns", 250L * 1000 * 1000 // 250ms
9+
)
10+
11+
internal class PermitTransferStatus {
12+
private val status = atomic(false)
13+
fun check(): Boolean = status.value
14+
fun complete(): Boolean = status.compareAndSet(false, true)
15+
}
16+
17+
internal expect class PermitTransfer constructor() {
18+
/**
19+
* [releasePermit] may throw
20+
*/
21+
fun releaseFun(releasePermit: () -> Unit): () -> Unit
22+
23+
/**
24+
* [tryAllocatePermit] and [deallocatePermit] must not throw
25+
*/
26+
fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit)
27+
}
28+
29+
internal class BusyPermitTransfer {
30+
private val status = PermitTransferStatus()
31+
32+
fun releaseFun(releasePermit: () -> Unit): () -> Unit = {
33+
if (status.complete()) {
34+
releasePermit()
35+
}
36+
}
37+
38+
fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) {
39+
while (true) {
40+
if (status.check()) {
41+
return
42+
}
43+
if (tryAllocatePermit()) {
44+
if (!status.complete()) { // race: transfer was completed first by releaseFun
45+
deallocatePermit()
46+
}
47+
return
48+
}
49+
}
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package kotlinx.coroutines.internal
2+
3+
internal actual typealias PermitTransfer = BusyPermitTransfer // TODO
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package kotlinx.coroutines.internal
2+
3+
import java.util.concurrent.locks.*
4+
5+
internal actual class PermitTransfer {
6+
private val status = PermitTransferStatus()
7+
8+
public actual fun releaseFun(releasePermit: () -> Unit): () -> Unit {
9+
val blockedThread = Thread.currentThread()
10+
return {
11+
if (status.complete()) {
12+
try {
13+
releasePermit()
14+
} finally {
15+
LockSupport.unpark(blockedThread)
16+
}
17+
}
18+
}
19+
}
20+
21+
public actual fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) {
22+
while (true) {
23+
if (status.check()) {
24+
return
25+
}
26+
if (tryAllocatePermit()) {
27+
if (!status.complete()) { // race: transfer was completed first by releaseFun
28+
deallocatePermit()
29+
}
30+
return
31+
}
32+
LockSupport.parkNanos(
33+
PERMIT_ACQUIRE_PARK_NS // todo: not sure if it's needed at all, I mean that it is < Long.MAX_VALUE, but at least this way it's safer
34+
)
35+
}
36+
}
37+
}

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+28-28
Original file line numberDiff line numberDiff line change
@@ -703,37 +703,18 @@ internal class CoroutineScheduler(
703703
assert { state == WorkerState.BLOCKING }
704704
decrementBlockingTasks()
705705
if (tryAcquireCpuPermit()) return
706-
class CpuPermitTransfer {
707-
private val status = atomic(false)
708-
fun check(): Boolean = status.value
709-
fun complete(): Boolean = status.compareAndSet(false, true)
710-
}
711-
val permitTransfer = CpuPermitTransfer()
712-
val blockedWorker = this@Worker
713-
scheduler.dispatch(Runnable {
706+
val permitTransfer = PermitTransfer()
707+
scheduler.dispatch(permitTransfer.releaseFun {
714708
// this code runs in a different worker thread that holds a CPU token
715709
val cpuHolder = currentThread() as Worker
716710
assert { cpuHolder.state == WorkerState.CPU_ACQUIRED }
717-
if (permitTransfer.complete()) {
718-
cpuHolder.state = WorkerState.BLOCKING
719-
LockSupport.unpark(blockedWorker)
720-
}
711+
cpuHolder.state = WorkerState.BLOCKING
721712
}, taskContext = NonBlockingContext)
722-
while (true) {
723-
if (permitTransfer.check()) {
724-
state = WorkerState.CPU_ACQUIRED
725-
break
726-
}
727-
if (tryAcquireCpuPermit()) {
728-
if (!permitTransfer.complete()) {
729-
// race: transfer was completed by another thread
730-
releaseCpuPermit()
731-
}
732-
assert { state == WorkerState.CPU_ACQUIRED }
733-
break
734-
}
735-
LockSupport.parkNanos(CPU_REACQUIRE_PARK_NS)
736-
}
713+
permitTransfer.acquire(
714+
tryAllocatePermit = this@CoroutineScheduler::tryAcquireCpuPermit,
715+
deallocatePermit = ::releaseCpuPermit
716+
)
717+
state = WorkerState.CPU_ACQUIRED
737718
}
738719

739720
override fun run() = runWorker()
@@ -841,14 +822,20 @@ internal class CoroutineScheduler(
841822

842823
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
843824

825+
private var currentTask: Task? = null
826+
844827
private fun executeTask(task: Task) {
845828
val taskMode = task.mode
846829
idleReset(taskMode)
847830
beforeTask(taskMode)
831+
currentTask = task
848832
runSafely(task)
833+
currentTask = null
849834
afterTask(taskMode)
850835
}
851836

837+
internal fun getCurrentTaskImpl(): TaskImpl? = currentTask as? TaskImpl
838+
852839
private fun beforeTask(taskMode: Int) {
853840
if (taskMode == TASK_NON_BLOCKING) return
854841
// Always notify about new work when releasing CPU-permit to execute some blocking task
@@ -1091,7 +1078,9 @@ internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &
10911078
*/
10921079
internal fun withUnlimitedIOScheduler(blocking: () -> Unit) {
10931080
withoutCpuPermit {
1094-
blocking()
1081+
withTaskBlockingDispatch {
1082+
blocking()
1083+
}
10951084
}
10961085
}
10971086

@@ -1104,3 +1093,14 @@ private fun withoutCpuPermit(body: () -> Unit) {
11041093
if (releasedPermit) worker.reacquireCpu()
11051094
}
11061095
}
1096+
1097+
private fun withTaskBlockingDispatch(body: () -> Unit) {
1098+
val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body()
1099+
val dispatchAware = worker.getCurrentTaskImpl()?.block as? BlockingDispatchAware ?: return body()
1100+
dispatchAware.beforeDispatchElsewhere()
1101+
try {
1102+
return body()
1103+
} finally {
1104+
dispatchAware.afterDispatchBack()
1105+
}
1106+
}

kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt

-4
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
1919
"kotlinx.coroutines.scheduler.resolution.ns", 100000L
2020
)
2121

22-
@JvmField
23-
internal val CPU_REACQUIRE_PARK_NS = systemProp(
24-
"kotlinx.coroutines.scheduler.cpu.reacquire.ns", 250L * 1000 * 1000
25-
)
2622

2723
/**
2824
* The maximum number of threads allocated for CPU-bound tasks at the default set of dispatchers.

kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt

+30
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.*
44
import org.junit.*
55
import org.junit.rules.*
66
import java.util.concurrent.*
7+
import java.util.concurrent.atomic.*
78

89
class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
910

@@ -186,4 +187,33 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
186187
body(1)
187188
checkPoolThreadsCreated(maxDepth..maxDepth + 1)
188189
}
190+
191+
@Test
192+
fun testNoStarvationOfLimitedDispatcherWithRunBlocking() {
193+
val taskCount = 5
194+
val dispatcher = blockingDispatcher(2)
195+
val barrier = CompletableDeferred<Unit>()
196+
val count = AtomicInteger(0)
197+
fun blockingCode() {
198+
runBlocking {
199+
count.incrementAndGet()
200+
barrier.await()
201+
count.decrementAndGet()
202+
}
203+
}
204+
runBlocking {
205+
repeat(taskCount) {
206+
launch(dispatcher) {
207+
blockingCode()
208+
}
209+
}
210+
while (count.get() != taskCount) {
211+
Thread.sleep(1)
212+
}
213+
barrier.complete(Unit)
214+
while (count.get() != 0) {
215+
Thread.sleep(1)
216+
}
217+
}
218+
}
189219
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package kotlinx.coroutines.internal
2+
3+
internal actual typealias PermitTransfer = BusyPermitTransfer // TODO

0 commit comments

Comments
 (0)