Skip to content

Commit 2bc6b26

Browse files
committed
Implement internal API for running current event loop
Fixes #3439
1 parent 747db9e commit 2bc6b26

File tree

7 files changed

+284
-25
lines changed

7 files changed

+284
-25
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,9 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
310310
}
311311

312312
public final class kotlinx/coroutines/EventLoopKt {
313+
public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z
313314
public static final fun processNextEventInCurrentThread ()J
315+
public static final fun runSingleTaskFromCurrentSystemDispatcher ()J
314316
}
315317

316318
public final class kotlinx/coroutines/ExceptionsKt {

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ import kotlin.jvm.*
1313
* The result of .limitedParallelism(x) call, a dispatcher
1414
* that wraps the given dispatcher, but limits the parallelism level, while
1515
* trying to emulate fairness.
16+
*
17+
* ### Implementation details
18+
*
19+
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
20+
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
21+
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
22+
* in order to avoid starvation of the underlying dispatcher.
23+
*
24+
* Such invariant is crucial in order to be compatible with any underlying dispatcher implementation without
25+
* direct cooperation.
1626
*/
1727
internal class LimitedDispatcher(
1828
private val dispatcher: CoroutineDispatcher,

kotlinx-coroutines-core/jvm/src/EventLoop.kt

+78
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.Runnable
8+
import kotlinx.coroutines.scheduling.*
9+
import kotlinx.coroutines.scheduling.CoroutineScheduler
10+
711
internal actual abstract class EventLoopImplPlatform: EventLoop() {
812
protected abstract val thread: Thread
913

@@ -45,6 +49,80 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr
4549
*/
4650
@InternalCoroutinesApi
4751
public fun processNextEventInCurrentThread(): Long =
52+
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
53+
// (and, to avoid actual blocking, does something via this call), see #850
4854
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
4955

5056
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
57+
58+
/**
59+
* Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
60+
* Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
61+
* (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
62+
*
63+
* ### Invariants
64+
*
65+
* - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
66+
* [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
67+
* up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
68+
* **are not** executed[1].
69+
* - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
70+
*
71+
* [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
72+
* and we cannot leave this method without leaving thread in its original state.
73+
*
74+
* ### Rationale
75+
*
76+
* This is an internal API that is intended to replace IDEA's core FJP decomposition.
77+
* The following API is provided by IDEA core:
78+
* ```
79+
* runDecomposedTaskAndJoinIt { // <- non-suspending call
80+
* // spawn as many task as needed
81+
* // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
82+
* }
83+
* ```
84+
* The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
85+
* thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
86+
* 'runDecomposedTaskAndJoinIt' should not only block, but also **help** executing the task or other tasks
87+
* until an arbitrary condition is satisfied.
88+
*
89+
* See #3439 for additional details.
90+
*
91+
* ### Limitations and caveats
92+
*
93+
* - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
94+
* - Is not 100% effective, because the caller should somehow "wait" (or do other work) during [Long] returned nanoseconds
95+
* even when work arrives immediately after returning from this method.
96+
* - When there is no more work, it's up to the caller to decide what to do. It's important to account that
97+
* work to current dispatcher may arrive **later** from external sources [1]
98+
*
99+
* [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
100+
* a tremendous effort.
101+
*
102+
* @throws IllegalStateException if the current thread is not system dispatcher thread
103+
*/
104+
@InternalCoroutinesApi
105+
@DelicateCoroutinesApi
106+
@PublishedApi
107+
internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
108+
val thread = Thread.currentThread()
109+
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
110+
return thread.runSingleTask()
111+
}
112+
113+
/**
114+
* Checks whether the given thread belongs to Dispatchers.IO.
115+
* Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
116+
* may change this status when switching between tasks.
117+
*
118+
* This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
119+
* purposes, and is declared as an extension only to avoid top-level scope pollution.
120+
*/
121+
@InternalCoroutinesApi
122+
@DelicateCoroutinesApi
123+
@PublishedApi
124+
internal fun Thread.isIoDispatcherThread(): Boolean {
125+
if (this !is CoroutineScheduler.Worker) return false
126+
return isIo()
127+
}
128+

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+40-8
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,30 @@ internal class CoroutineScheduler(
720720
tryReleaseCpu(WorkerState.TERMINATED)
721721
}
722722

723+
/**
724+
* See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
725+
* This is a fine-tailored method for a specific use-case not expected to be used widely.
726+
*/
727+
fun runSingleTask(): Long {
728+
val stateSnapshot = state
729+
val isCpuThread = state == WorkerState.CPU_ACQUIRED
730+
val task = if (isCpuThread) {
731+
findCpuTask()
732+
} else {
733+
findBlockingTask()
734+
}
735+
if (task == null) {
736+
if (minDelayUntilStealableTaskNs == 0L) return -1L
737+
return minDelayUntilStealableTaskNs
738+
}
739+
runSafely(task)
740+
if (!isCpuThread) decrementBlockingTasks()
741+
assert { state == stateSnapshot}
742+
return 0L
743+
}
744+
745+
fun isIo() = state == WorkerState.BLOCKING
746+
723747
// Counterpart to "tryUnpark"
724748
private fun tryPark() {
725749
if (!inStack()) {
@@ -879,9 +903,21 @@ internal class CoroutineScheduler(
879903
* * Check if our queue has one (maybe mixed in with CPU tasks)
880904
* * Poll global and try steal
881905
*/
906+
return findBlockingTask()
907+
}
908+
909+
// NB: ONLY for runSingleTask method
910+
private fun findBlockingTask(): Task? {
882911
return localQueue.pollBlocking()
883912
?: globalBlockingQueue.removeFirstOrNull()
884-
?: trySteal(blockingOnly = true)
913+
?: trySteal(STEAL_BLOCKING_ONLY)
914+
}
915+
916+
// NB: ONLY for runSingleTask method
917+
private fun findCpuTask(): Task? {
918+
return localQueue.pollCpu()
919+
?: globalBlockingQueue.removeFirstOrNull()
920+
?: trySteal(STEAL_CPU_ONLY)
885921
}
886922

887923
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
@@ -897,7 +933,7 @@ internal class CoroutineScheduler(
897933
} else {
898934
pollGlobalQueues()?.let { return it }
899935
}
900-
return trySteal(blockingOnly = false)
936+
return trySteal(STEAL_ANY)
901937
}
902938

903939
private fun pollGlobalQueues(): Task? {
@@ -910,7 +946,7 @@ internal class CoroutineScheduler(
910946
}
911947
}
912948

913-
private fun trySteal(blockingOnly: Boolean): Task? {
949+
private fun trySteal(stealingMode: StealingMode): Task? {
914950
val created = createdWorkers
915951
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
916952
if (created < 2) {
@@ -924,11 +960,7 @@ internal class CoroutineScheduler(
924960
if (currentIndex > created) currentIndex = 1
925961
val worker = workers[currentIndex]
926962
if (worker !== null && worker !== this) {
927-
val stealResult = if (blockingOnly) {
928-
worker.localQueue.tryStealBlocking(stolenTask)
929-
} else {
930-
worker.localQueue.trySteal(stolenTask)
931-
}
963+
val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
932964
if (stealResult == TASK_STOLEN) {
933965
val result = stolenTask.element
934966
stolenTask.element = null

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

+58-17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
1616
internal const val TASK_STOLEN = -1L
1717
internal const val NOTHING_TO_STEAL = -2L
1818

19+
internal typealias StealingMode = Int
20+
internal const val STEAL_ANY: StealingMode = -1
21+
internal const val STEAL_CPU_ONLY: StealingMode = 0
22+
internal const val STEAL_BLOCKING_ONLY: StealingMode = 1
23+
1924
/**
2025
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
2126
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
@@ -107,25 +112,37 @@ internal class WorkQueue {
107112
*
108113
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
109114
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
115+
*
116+
* [StealingMode] controls what tasks to steal:
117+
* * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen
118+
* * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task which is used by scheduler when helping in Dispatchers.IO mode
119+
* * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher`
110120
*/
111-
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long {
112-
val task = pollBuffer()
121+
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
122+
val task = when (stealingMode) {
123+
STEAL_ANY -> pollBuffer()
124+
else -> stealWithExclusiveMode(stealingMode)
125+
}
126+
113127
if (task != null) {
114128
stolenTaskRef.element = task
115129
return TASK_STOLEN
116130
}
117-
return tryStealLastScheduled(stolenTaskRef, blockingOnly = false)
131+
return tryStealLastScheduled(stealingMode, stolenTaskRef)
118132
}
119133

120-
fun tryStealBlocking(stolenTaskRef: ObjectRef<Task?>): Long {
134+
// Steal only tasks of a particular kind, potentially invoking full queue scan
135+
private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? {
121136
var start = consumerIndex.value
122137
val end = producerIndex.value
123-
124-
while (start != end && blockingTasksInBuffer.value > 0) {
125-
stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue
126-
return TASK_STOLEN
138+
val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
139+
// CPU or (BLOCKING & hasBlocking)
140+
val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0
141+
while (start != end && shouldProceed) {
142+
return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue
127143
}
128-
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)
144+
145+
return null
129146
}
130147

131148
// Polls for blocking task, invoked only by the owner
@@ -138,23 +155,41 @@ internal class WorkQueue {
138155
} // Failed -> someone else stole it
139156
}
140157

158+
return pollWithMode(onlyBlocking = true /* only blocking */)
159+
}
160+
161+
fun pollCpu(): Task? {
162+
while (true) { // Poll the slot
163+
val lastScheduled = lastScheduledTask.value ?: break
164+
if (lastScheduled.isBlocking) break
165+
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
166+
return lastScheduled
167+
} // Failed -> someone else stole it
168+
}
169+
170+
return pollWithMode(onlyBlocking = false /* only cpu */)
171+
}
172+
173+
private fun pollWithMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
141174
val start = consumerIndex.value
142175
var end = producerIndex.value
143-
144-
while (start != end && blockingTasksInBuffer.value > 0) {
145-
val task = tryExtractBlockingTask(--end)
176+
// CPU or (BLOCKING & hasBlocking)
177+
val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0
178+
while (start != end && shouldProceed) {
179+
val task = tryExtractFromTheMiddle(--end, onlyBlocking)
146180
if (task != null) {
147181
return task
148182
}
149183
}
150184
return null
151185
}
152186

153-
private fun tryExtractBlockingTask(index: Int): Task? {
187+
private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? {
188+
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
154189
val arrayIndex = index and MASK
155190
val value = buffer[arrayIndex]
156-
if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
157-
blockingTasksInBuffer.decrementAndGet()
191+
if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
192+
if (onlyBlocking) blockingTasksInBuffer.decrementAndGet()
158193
return value
159194
}
160195
return null
@@ -170,10 +205,16 @@ internal class WorkQueue {
170205
/**
171206
* Contract on return value is the same as for [trySteal]
172207
*/
173-
private fun tryStealLastScheduled(stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
208+
private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
174209
while (true) {
175210
val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL
176-
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
211+
if (lastScheduled.isBlocking) {
212+
if (stealingMode == STEAL_CPU_ONLY) {
213+
return NOTHING_TO_STEAL
214+
}
215+
} else if (stealingMode == STEAL_BLOCKING_ONLY) {
216+
return NOTHING_TO_STEAL
217+
}
177218

178219
// TODO time wraparound ?
179220
val time = schedulerTimeSource.nanoTime()

0 commit comments

Comments
 (0)