Skip to content

Commit 03cca5b

Browse files
committed
Document blocking tasks machinery, get rid of an additional class
1 parent dd30af0 commit 03cca5b

File tree

2 files changed

+46
-35
lines changed

2 files changed

+46
-35
lines changed

kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

+44-23
Original file line numberDiff line numberDiff line change
@@ -20,49 +20,70 @@ import kotlin.random.*
2020
*
2121
* Current scheduler implementation has two optimization targets:
2222
* * Efficiency in the face of communication patterns (e.g., actors communicating via channel)
23-
* * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool
23+
* * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
2424
*
2525
* ### Structural overview
2626
*
27-
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] lazily created threads
28-
* to execute blocking tasks. Every worker has a local queue in addition to a global scheduler queue and the global queue
29-
* has priority over local queue to avoid starvation of externally-submitted (e.g. from Android UI thread) tasks.
30-
* Work-stealing is implemented on top of that queues to provide even load distribution and illusion of centralized run queue.
27+
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
28+
* [maxPoolSize] lazily created threads to execute blocking tasks.
29+
* Every worker has a local queue in addition to a global scheduler queue
30+
* and the global queue has priority over local queue to avoid starvation of externally-submitted
31+
* (e.g. from Android UI thread) tasks.
32+
* Work-stealing is implemented on top of that queues to provide
33+
* even load distribution and illusion of centralized run queue.
3134
*
3235
* ### Scheduling policy
3336
*
3437
* When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
35-
* If the head is not empty, the task from the head is moved to the tail. Though it is unfair scheduling policy,
38+
* If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
3639
* it effectively couples communicating coroutines into one and eliminates scheduling latency
37-
* that arises from placing task to the end of the queue.
38-
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise queue degenerates to stack.
40+
* that arises from placing tasks to the end of the queue.
41+
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
3942
* When a coroutine is dispatched from an external thread, it's put into the global queue.
4043
*
4144
* ### Work stealing and affinity
4245
*
43-
* To provide even tasks distribution worker tries to steal tasks from other workers queues before parking when his local queue is empty.
44-
* A non-standard solution is implemented to provide tasks affinity: task from FIFO buffer may be stolen only if it is stale enough
45-
* (based on the value of [WORK_STEALING_TIME_RESOLUTION_NS]).
46-
* For this purpose monotonic global clock ([System.nanoTime]) is used and every task has associated with it submission time.
47-
* This approach shows outstanding results when coroutines are cooperative, but as downside scheduler now depends on high-resolution global clock
46+
* To provide even tasks distribution worker tries to steal tasks from other workers queues
47+
* before parking when his local queue is empty.
48+
* A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
49+
* only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
50+
* For this purpose, monotonic global clock is used, and every task has associated with its submission time.
51+
* This approach shows outstanding results when coroutines are cooperative,
52+
* but as downside scheduler now depends on a high-resolution global clock,
4853
* which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
4954
*
5055
* ### Thread management
51-
* One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees similar
52-
* to the regular centralized executors. The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
53-
* The former field incorporates the amount of created threads, CPU-tokens and blocking tasks that require a thread compensation,
56+
* One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
57+
* similar to the regular centralized executors.
58+
* The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
59+
* The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
60+
* that require a thread compensation,
5461
* while the latter represents intrusive versioned Treiber stack of idle workers.
55-
* When a worker cannot find any work, he first adds itself to the stack, then re-scans the queue (to avoid missing signal)
56-
* and then attempts to park itself (there is additional layer of signalling against unnecessary park/unpark).
57-
* If worker finds a task that it cannot yet steal due to timer constraints, it stores this fact in its state
62+
* When a worker cannot find any work, they first add themselves to the stack,
63+
* then re-scans the queue to avoid missing signals and then attempts to park
64+
* with additional rendezvous against unnecessary parking.
65+
* If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
5866
* (to be uncounted when additional work is signalled) and parks for such duration.
5967
*
60-
* When a new task arrives to the scheduler (whether it's local or global queue), either an idle worker is being signalled, or
61-
* a new worker is attempted to be created (only [corePoolSize] workers can be created for regular CPU tasks).
68+
* When a new task arrives in the scheduler (whether it is local or global queue),
69+
* either an idle worker is being signalled, or a new worker is attempted to be created.
70+
* Only [corePoolSize] workers can be created for regular CPU tasks)
6271
*
63-
* ### Dynamic resizing and support of blocking tasks
72+
* ### Support for blocking tasks
73+
* The scheduler also supports the notion of [blocking][TaskMode.PROBABLY_BLOCKING] tasks.
74+
* When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
75+
* addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
76+
* to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
77+
* "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
78+
* When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
79+
* keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
80+
* and at most core pool size threads to execute CPU tasks".
81+
* To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
82+
* and steal **only** blocking tasks from other workers.
6483
*
65-
* TODO
84+
* The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
85+
* End users do not have access to the scheduler directly and can dispatch blocking tasks only with
86+
* [LimitingDispatcher] that does control concurrency level by its own mechanism.
6687
*/
6788
@Suppress("NOTHING_TO_INLINE")
6889
internal class CoroutineScheduler(

kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt

+2-12
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ internal class WorkQueue {
138138
}
139139

140140
fun offloadAllWorkTo(globalQueue: GlobalQueue) {
141-
lastScheduledTask.getAndSet(null)?.let { globalQueue.add(it) }
141+
lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) }
142142
while (pollTo(globalQueue)) {
143143
// Steal everything
144144
}
@@ -173,7 +173,7 @@ internal class WorkQueue {
173173

174174
private fun pollTo(queue: GlobalQueue): Boolean {
175175
val task = pollBuffer() ?: return false
176-
queue.add(task)
176+
queue.addLast(task)
177177
return true
178178
}
179179

@@ -198,13 +198,3 @@ internal class WorkQueue {
198198
}
199199
}
200200
}
201-
202-
private fun GlobalQueue.add(task: Task) {
203-
/*
204-
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
205-
* been already shutdown (with the only exception of the last worker thread that might be performing
206-
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
207-
*/
208-
val added = addLast(task)
209-
assert { added }
210-
}

0 commit comments

Comments
 (0)