Skip to content

Commit f8994b6

Browse files
committed
Introduce withUnlimitedIOScheduler utility method to use instead of direct interaction with Worker
Kotlin#3983 / IJPL-721
1 parent 2f9d2e3 commit f8994b6

File tree

3 files changed

+28
-14
lines changed

3 files changed

+28
-14
lines changed

kotlinx-coroutines-core/jvm/src/Builders.kt

+4-9
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines
66

77
import kotlinx.coroutines.scheduling.*
8-
import kotlinx.coroutines.scheduling.CoroutineScheduler
98
import java.util.concurrent.locks.*
109
import kotlin.contracts.*
1110
import kotlin.coroutines.*
@@ -88,7 +87,6 @@ private class BlockingCoroutine<T>(
8887
@Suppress("UNCHECKED_CAST")
8988
fun joinBlocking(): T {
9089
registerTimeLoopThread()
91-
var cpuPermitReleased: Boolean? = null
9290
try {
9391
eventLoop?.incrementUseCount()
9492
try {
@@ -98,20 +96,17 @@ private class BlockingCoroutine<T>(
9896
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
9997
// note: process next even may loose unpark flag, so check if completed before parking
10098
if (isCompleted) break
101-
if (parkNanos > 0 && cpuPermitReleased == null) {
102-
val worker = Thread.currentThread() as? CoroutineScheduler.Worker
103-
cpuPermitReleased = worker?.releaseCpu() ?: false
99+
if (parkNanos > 0) {
100+
withUnlimitedIOScheduler {
101+
parkNanos(this, parkNanos)
102+
}
104103
}
105-
parkNanos(this, parkNanos)
106104
}
107105
} finally { // paranoia
108106
eventLoop?.decrementUseCount()
109107
}
110108
} finally { // paranoia
111109
unregisterTimeLoopThread()
112-
if (cpuPermitReleased == true) {
113-
(Thread.currentThread() as CoroutineScheduler.Worker).reacquireCpu()
114-
}
115110
}
116111
// now return result
117112
val state = this.state.unboxState()

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+22-3
Original file line numberDiff line numberDiff line change
@@ -690,15 +690,15 @@ internal class CoroutineScheduler(
690690
return hadCpu
691691
}
692692

693-
/** only for [runBlocking] */
693+
/** only for [withoutCpuPermit] */
694694
fun releaseCpu(): Boolean {
695695
assert { state == WorkerState.CPU_ACQUIRED || state == WorkerState.BLOCKING }
696696
return tryReleaseCpu(WorkerState.BLOCKING).also { released ->
697697
if (released) incrementBlockingTasks()
698698
}
699699
}
700700

701-
/** only for [runBlocking] */
701+
/** only for [withoutCpuPermit] */
702702
fun reacquireCpu() {
703703
assert { state == WorkerState.BLOCKING }
704704
decrementBlockingTasks()
@@ -732,7 +732,7 @@ internal class CoroutineScheduler(
732732
assert { state == WorkerState.CPU_ACQUIRED }
733733
break
734734
}
735-
LockSupport.parkNanos(RUN_BLOCKING_CPU_REACQUIRE_PARK_NS)
735+
LockSupport.parkNanos(CPU_REACQUIRE_PARK_NS)
736736
}
737737
}
738738

@@ -1085,3 +1085,22 @@ internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Wo
10851085
@JvmName("mayNotBlock")
10861086
internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
10871087
thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1088+
1089+
/**
1090+
* Emulates dispatch to [UnlimitedIoScheduler] in a blocking context.
1091+
*/
1092+
internal fun withUnlimitedIOScheduler(blocking: () -> Unit) {
1093+
withoutCpuPermit {
1094+
blocking()
1095+
}
1096+
}
1097+
1098+
private fun withoutCpuPermit(body: () -> Unit) {
1099+
val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body()
1100+
val releasedPermit = worker.releaseCpu()
1101+
try {
1102+
return body()
1103+
} finally {
1104+
if (releasedPermit) worker.reacquireCpu()
1105+
}
1106+
}

kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
2020
)
2121

2222
@JvmField
23-
internal val RUN_BLOCKING_CPU_REACQUIRE_PARK_NS = systemProp(
24-
"kotlinx.coroutines.scheduler.runBlocking.cpu.reacquire.ns", 250L * 1000 * 1000
23+
internal val CPU_REACQUIRE_PARK_NS = systemProp(
24+
"kotlinx.coroutines.scheduler.cpu.reacquire.ns", 250L * 1000 * 1000
2525
)
2626

2727
/**

0 commit comments

Comments
 (0)