-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathLimitedDispatcher.kt
135 lines (120 loc) · 5.4 KB
/
LimitedDispatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package kotlinx.coroutines.internal
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
/**
* The result of .limitedParallelism(x) call, a dispatcher
* that wraps the given dispatcher, but limits the parallelism level, while
* trying to emulate fairness.
*
* ### Implementation details
*
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
* in order to avoid starvation of the underlying dispatcher.
*
* Such behavior is crucial to be compatible with any underlying dispatcher implementation without
* direct cooperation.
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int,
private val name: String?
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
// Atomic is necessary here for the sake of K/N memory ordering,
// there is no need in atomic operations for this property
private val runningWorkers = atomic(0)
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return namedOrThis(name)
return super.limitedParallelism(parallelism, name)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) { worker ->
dispatcher.safeDispatch(this, worker)
}
}
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) { worker ->
dispatcher.dispatchYield(this, worker)
}
}
/**
* Tries to dispatch the given [block].
* If there are not enough workers, it starts a new one via [startWorker].
*/
private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
// Add task to queue so running workers will be able to see that
queue.addLast(block)
if (runningWorkers.value >= parallelism) return
// allocation may fail if some workers were launched in parallel or a worker temporarily decreased
// `runningWorkers` when they observed an empty queue.
if (!tryAllocateWorker()) return
val task = obtainTaskOrDeallocateWorker() ?: return
startWorker(Worker(task))
}
/**
* Tries to obtain the permit to start a new worker.
*/
private fun tryAllocateWorker(): Boolean {
synchronized(workerAllocationLock) {
if (runningWorkers.value >= parallelism) return false
runningWorkers.incrementAndGet()
return true
}
}
/**
* Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
*/
private fun obtainTaskOrDeallocateWorker(): Runnable? {
while (true) {
when (val nextTask = queue.removeFirstOrNull()) {
null -> synchronized(workerAllocationLock) {
runningWorkers.decrementAndGet()
if (queue.size == 0) return null
runningWorkers.incrementAndGet()
}
else -> return nextTask
}
}
}
override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)"
/**
* A worker that polls the queue and runs tasks until there are no more of them.
*
* It always stores the next task to run. This is done in order to prevent the possibility of the fairness
* re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
* actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
* perform any more dispatches.
*/
private inner class Worker(private var currentTask: Runnable) : Runnable {
override fun run() {
var fairnessCounter = 0
while (true) {
try {
currentTask.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
currentTask = obtainTaskOrDeallocateWorker() ?: return
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
// Do "yield" to let other views execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
dispatcher.safeDispatch(this@LimitedDispatcher, this)
return
}
}
}
}
}
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
if (name != null) return NamedDispatcher(this, name)
return this
}