-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathLimitedDispatcher.kt
118 lines (103 loc) · 4.37 KB
/
LimitedDispatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.jvm.*
/**
* The result of .limitedParallelism(x) call, a dispatcher
* that wraps the given dispatcher, but limits the parallelism level, while
* trying to emulate fairness.
*
* ### Implementation details
*
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
* in order to avoid starvation of the underlying dispatcher.
*
* Such behavior is crucial to be compatible with any underlying dispatcher implementation without
* direct cooperation.
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {
// Atomic is necessary here for the sake of K/N memory ordering,
// there is no need in atomic operations for this property
private val runningWorkers = atomic(0)
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()
@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
}
override fun run() {
var fairnessCounter = 0
while (true) {
val task = queue.removeFirstOrNull()
if (task != null) {
try {
task.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
// Do "yield" to let other views to execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
dispatcher.dispatch(this, this)
return
}
continue
}
synchronized(workerAllocationLock) {
runningWorkers.decrementAndGet()
if (queue.size == 0) return
runningWorkers.incrementAndGet()
fairnessCounter = 0
}
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) {
dispatcher.dispatch(this, this)
}
}
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) {
dispatcher.dispatchYield(this, this)
}
}
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
// Add task to queue so running workers will be able to see that
if (addAndTryDispatching(block)) return
/*
* Protect against the race when the number of workers is enough,
* but one (because of synchronized serialization) attempts to complete,
* and we just observed the number of running workers smaller than the actual
* number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`)
*/
if (!tryAllocateWorker()) return
dispatch()
}
private fun tryAllocateWorker(): Boolean {
synchronized(workerAllocationLock) {
if (runningWorkers.value >= parallelism) return false
runningWorkers.incrementAndGet()
return true
}
}
private fun addAndTryDispatching(block: Runnable): Boolean {
queue.addLast(block)
return runningWorkers.value >= parallelism
}
}
// Save a few bytecode ops
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }