-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathMultithreadedDispatchers.kt
178 lines (155 loc) · 6.97 KB
/
MultithreadedDispatchers.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@file:OptIn(ObsoleteWorkersApi::class)
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.concurrent.AtomicReference
import kotlin.native.concurrent.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds
@DelicateCoroutinesApi
public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
return MultiWorkerDispatcher(name, nThreads)
}
internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
private val worker = Worker.start(name = name)
override fun dispatch(context: CoroutineContext, block: Runnable) {
worker.executeAfter(0L) { block.run() }
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = schedule(timeMillis, Runnable {
with(continuation) { resumeUndispatched(Unit) }
})
continuation.disposeOnCancellation(handle)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
schedule(timeMillis, block)
private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle {
// Workers don't have an API to cancel sent "executeAfter" block, but we are trying
// to control the damage and reduce reachable objects by nulling out `block`
// that may retain a lot of references, and leaving only an empty shell after a timely disposal
// This is a class and not an object with `block` in a closure because that would defeat the purpose.
class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
private val disposableHolder = AtomicReference<Runnable?>(block)
override fun invoke() {
disposableHolder.value?.run()
}
override fun dispose() {
disposableHolder.value = null
}
fun isDisposed() = disposableHolder.value == null
}
fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) {
if (block.isDisposed()) return
val durationUntilTarget = -targetMoment.elapsedNow()
val quantum = 100.milliseconds
if (durationUntilTarget > quantum) {
executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) }
} else {
executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block)
}
}
val disposableBlock = DisposableBlock(block)
val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds
worker.runAfterDelay(disposableBlock, targetMoment)
return disposableBlock
}
override fun close() {
worker.requestTermination().result // Note: calling "result" blocks
}
}
private class MultiWorkerDispatcher(
private val name: String,
private val workersCount: Int
) : CloseableCoroutineDispatcher() {
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
private val workerPool = OnDemandAllocatingPool(workersCount) {
Worker.start(name = "$name-$it").apply {
executeAfter { workerRunLoop() }
}
}
/**
* (number of tasks - number of workers) * 2 + (1 if closed)
*/
private val tasksAndWorkersCounter = atomic(0L)
private inline fun Long.isClosed() = this and 1L == 1L
private inline fun Long.hasTasks() = this >= 2
private inline fun Long.hasWorkers() = this < 0
private fun workerRunLoop() = runBlocking {
while (true) {
val state = tasksAndWorkersCounter.getAndUpdate {
if (it.isClosed() && !it.hasTasks()) return@runBlocking
it - 2
}
if (state.hasTasks()) {
// we promised to process a task, and there are some
tasksQueue.receive().run()
} else {
try {
suspendCancellableCoroutine {
val result = availableWorkers.trySend(it)
checkChannelResult(result)
}.run()
} catch (e: CancellationException) {
/** we are cancelled from [close] and thus will never get back to this branch of code,
but there may still be pending work, so we can't just exit here. */
}
}
}
}
// a worker that promised to be here and should actually arrive, so we wait for it in a blocking manner.
private fun obtainWorker(): CancellableContinuation<Runnable> =
availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() }
override fun dispatch(context: CoroutineContext, block: Runnable) {
val state = tasksAndWorkersCounter.getAndUpdate {
if (it.isClosed())
throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
it + 2
}
if (state.hasWorkers()) {
// there are workers that have nothing to do, let's grab one of them
obtainWorker().resume(block)
} else {
workerPool.allocate()
// no workers are available, we must queue the task
val result = tasksQueue.trySend(block)
checkChannelResult(result)
}
}
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= workersCount) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}
override fun close() {
tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
val workers = workerPool.close() // no new workers will be created
while (true) {
// check if there are workers that await tasks in their personal channels, we need to wake them up
val state = tasksAndWorkersCounter.getAndUpdate {
if (it.hasWorkers()) it + 2 else it
}
if (!state.hasWorkers())
break
obtainWorker().cancel()
}
/*
* Here we cannot avoid waiting on `.result`, otherwise it will lead
* to a native memory leak, including a pthread handle.
*/
val requests = workers.map { it.requestTermination() }
requests.map { it.result }
}
private fun checkChannelResult(result: ChannelResult<*>) {
if (!result.isSuccess)
throw IllegalStateException(
"Internal invariants of $this were violated, please file a bug to kotlinx.coroutines",
result.exceptionOrNull()
)
}
}