Skip to content

Commit 8cee1fd

Browse files
committed
Preliminary prototype of #3439
1 parent 2006222 commit 8cee1fd

File tree

4 files changed

+162
-25
lines changed

4 files changed

+162
-25
lines changed

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

+10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ import kotlin.jvm.*
1313
* The result of .limitedParallelism(x) call, a dispatcher
1414
* that wraps the given dispatcher, but limits the parallelism level, while
1515
* trying to emulate fairness.
16+
*
17+
* ### Implementation details
18+
*
19+
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
20+
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
21+
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
22+
* in order to avoid starvation of the underlying dispatcher.
23+
*
24+
* Such invariant is crucial in order to be compatible with any underlying dispatcher implementation without
25+
* direct cooperation.
1626
*/
1727
internal class LimitedDispatcher(
1828
private val dispatcher: CoroutineDispatcher,

kotlinx-coroutines-core/jvm/src/EventLoop.kt

+60
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.coroutines.Runnable
8+
import kotlinx.coroutines.scheduling.*
9+
import kotlinx.coroutines.scheduling.CoroutineScheduler
10+
711
internal actual abstract class EventLoopImplPlatform: EventLoop() {
812
protected abstract val thread: Thread
913

@@ -45,6 +49,62 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr
4549
*/
4650
@InternalCoroutinesApi
4751
public fun processNextEventInCurrentThread(): Long =
52+
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
53+
// (and, to avoid actual blocking, does something via this call), see #850
4854
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
4955

5056
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
57+
58+
/**
59+
* Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
60+
* Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
61+
* (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
62+
*
63+
* ### Invariants
64+
*
65+
* - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
66+
* [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
67+
* up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
68+
* **are not** executed[1].
69+
* - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
70+
*
71+
* [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
72+
* and we cannot leave this method without leaving thread in its original state.
73+
*
74+
* ### Rationale
75+
*
76+
* This is an internal API that is intended to replace IDEA's core FJP decomposition.
77+
* The following API is provided by IDEA core:
78+
* ```
79+
* runDecomposedTaskAndJoinIt { // <- non-suspending call
80+
* // spawn as many task as needed
81+
* // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
82+
* }
83+
* ```
84+
* The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
85+
* thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
86+
* 'runDecomposedTaskAndJoinIt' should not only block, but also **help** executing the task or other tasks
87+
* until an arbitrary condition is satisfied.
88+
*
89+
* See #3439 for additional details.
90+
*
91+
* ### Limitations and caveats
92+
*
93+
* - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
94+
* - Is not 100% effective, it waits for [Long] returned nanoseconds even when work arrives immediately after returning
95+
* from this method
96+
* - When there is no more work, it's up to the caller to decide what to do. It's important to account that
97+
* work to current dispatcher may arrive **later** from external sources [1]
98+
*
99+
* [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
100+
* a tremendous effort.
101+
*
102+
* @throws IllegalStateException if the current thread is not system dispatcher thread
103+
*/
104+
@InternalCoroutinesApi
105+
@DelicateCoroutinesApi
106+
public fun runSingleTaskFromCurrentSystemDispatcher(): Long {
107+
val thread = Thread.currentThread()
108+
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
109+
return thread.runSingleTask()
110+
}

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+36-8
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,28 @@ internal class CoroutineScheduler(
720720
tryReleaseCpu(WorkerState.TERMINATED)
721721
}
722722

723+
/**
724+
* See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
725+
* This is a fine-tailored method for a specific use-case not expected to be used widely.
726+
*/
727+
fun runSingleTask(): Long {
728+
val stateSnapshot = state
729+
val isCpuThread = state == WorkerState.CPU_ACQUIRED
730+
val task = if (isCpuThread) {
731+
findCpuTask()
732+
} else {
733+
findBlockingTask()
734+
}
735+
if (task == null) {
736+
if (minDelayUntilStealableTaskNs == 0L) return -1L
737+
return minDelayUntilStealableTaskNs
738+
}
739+
runSafely(task)
740+
if (!isCpuThread) decrementBlockingTasks()
741+
assert { state == stateSnapshot}
742+
return 0L
743+
}
744+
723745
// Counterpart to "tryUnpark"
724746
private fun tryPark() {
725747
if (!inStack()) {
@@ -879,9 +901,19 @@ internal class CoroutineScheduler(
879901
* * Check if our queue has one (maybe mixed in with CPU tasks)
880902
* * Poll global and try steal
881903
*/
904+
return findBlockingTask()
905+
}
906+
907+
private fun findBlockingTask(): Task? {
882908
return localQueue.pollBlocking()
883909
?: globalBlockingQueue.removeFirstOrNull()
884-
?: trySteal(blockingOnly = true)
910+
?: trySteal(STEAL_BLOCKING_ONLY)
911+
}
912+
913+
private fun findCpuTask(): Task? {
914+
return localQueue.pollCpu()
915+
?: globalBlockingQueue.removeFirstOrNull()
916+
?: trySteal(STEAL_CPU_ONLY)
885917
}
886918

887919
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
@@ -897,7 +929,7 @@ internal class CoroutineScheduler(
897929
} else {
898930
pollGlobalQueues()?.let { return it }
899931
}
900-
return trySteal(blockingOnly = false)
932+
return trySteal(STEAL_ANY)
901933
}
902934

903935
private fun pollGlobalQueues(): Task? {
@@ -910,7 +942,7 @@ internal class CoroutineScheduler(
910942
}
911943
}
912944

913-
private fun trySteal(blockingOnly: Boolean): Task? {
945+
private fun trySteal(stealingMode: StealingMode): Task? {
914946
val created = createdWorkers
915947
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
916948
if (created < 2) {
@@ -924,11 +956,7 @@ internal class CoroutineScheduler(
924956
if (currentIndex > created) currentIndex = 1
925957
val worker = workers[currentIndex]
926958
if (worker !== null && worker !== this) {
927-
val stealResult = if (blockingOnly) {
928-
worker.localQueue.tryStealBlocking(stolenTask)
929-
} else {
930-
worker.localQueue.trySteal(stolenTask)
931-
}
959+
val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
932960
if (stealResult == TASK_STOLEN) {
933961
val result = stolenTask.element
934962
stolenTask.element = null

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

+56-17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
1616
internal const val TASK_STOLEN = -1L
1717
internal const val NOTHING_TO_STEAL = -2L
1818

19+
internal typealias StealingMode = Int
20+
internal const val STEAL_ANY: StealingMode = -1
21+
internal const val STEAL_CPU_ONLY: StealingMode = 0
22+
internal const val STEAL_BLOCKING_ONLY: StealingMode = 1
23+
1924
/**
2025
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
2126
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
@@ -108,24 +113,34 @@ internal class WorkQueue {
108113
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
109114
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
110115
*/
111-
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long {
112-
val task = pollBuffer()
116+
// TODO move it to tests where appropriate
117+
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long = trySteal(STEAL_ANY, stolenTaskRef)
118+
119+
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
120+
val task = when (stealingMode) {
121+
STEAL_ANY -> pollBuffer()
122+
else -> stealWithExclusiveMode(stealingMode)
123+
}
124+
113125
if (task != null) {
114126
stolenTaskRef.element = task
115127
return TASK_STOLEN
116128
}
117-
return tryStealLastScheduled(stolenTaskRef, blockingOnly = false)
129+
return tryStealLastScheduled(stealingMode, stolenTaskRef)
118130
}
119131

120-
fun tryStealBlocking(stolenTaskRef: ObjectRef<Task?>): Long {
132+
// Steal only tasks of a particular kind, potentially invoking full queue scan
133+
private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? {
121134
var start = consumerIndex.value
122135
val end = producerIndex.value
123-
124-
while (start != end && blockingTasksInBuffer.value > 0) {
125-
stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue
126-
return TASK_STOLEN
136+
val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
137+
// CPU or (BLOCKING & hasBlocking)
138+
val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0
139+
while (start != end && shouldProceed) {
140+
return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue
127141
}
128-
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)
142+
143+
return null
129144
}
130145

131146
// Polls for blocking task, invoked only by the owner
@@ -138,23 +153,41 @@ internal class WorkQueue {
138153
} // Failed -> someone else stole it
139154
}
140155

156+
return pollWithMode(onlyBlocking = true /* only blocking */)
157+
}
158+
159+
fun pollCpu(): Task? {
160+
while (true) { // Poll the slot
161+
val lastScheduled = lastScheduledTask.value ?: break
162+
if (lastScheduled.isBlocking) break
163+
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
164+
return lastScheduled
165+
} // Failed -> someone else stole it
166+
}
167+
168+
return pollWithMode(onlyBlocking = false /* only cpu */)
169+
}
170+
171+
private fun pollWithMode(/* Only blocking OR only CPU */onlyBlocking: Boolean): Task? {
141172
val start = consumerIndex.value
142173
var end = producerIndex.value
143-
144-
while (start != end && blockingTasksInBuffer.value > 0) {
145-
val task = tryExtractBlockingTask(--end)
174+
// CPU or (BLOCKING & hasBlocking)
175+
val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0
176+
while (start != end && shouldProceed) {
177+
val task = tryExtractFromTheMiddle(--end, onlyBlocking)
146178
if (task != null) {
147179
return task
148180
}
149181
}
150182
return null
151183
}
152184

153-
private fun tryExtractBlockingTask(index: Int): Task? {
185+
private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? {
186+
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
154187
val arrayIndex = index and MASK
155188
val value = buffer[arrayIndex]
156-
if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
157-
blockingTasksInBuffer.decrementAndGet()
189+
if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
190+
if (onlyBlocking) blockingTasksInBuffer.decrementAndGet()
158191
return value
159192
}
160193
return null
@@ -170,10 +203,16 @@ internal class WorkQueue {
170203
/**
171204
* Contract on return value is the same as for [trySteal]
172205
*/
173-
private fun tryStealLastScheduled(stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
206+
private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
174207
while (true) {
175208
val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL
176-
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
209+
if (lastScheduled.isBlocking) {
210+
if (stealingMode == STEAL_CPU_ONLY) {
211+
return NOTHING_TO_STEAL
212+
}
213+
} else if (stealingMode == STEAL_BLOCKING_ONLY) {
214+
return NOTHING_TO_STEAL
215+
}
177216

178217
// TODO time wraparound ?
179218
val time = schedulerTimeSource.nanoTime()

0 commit comments

Comments
 (0)