-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathMultithreadedDispatchers.kt
153 lines (132 loc) · 5.61 KB
/
MultithreadedDispatchers.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*
@ExperimentalCoroutinesApi
public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher {
return WorkerDispatcher(name)
}
public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
return MultiWorkerDispatcher(name, nThreads)
}
internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
private val worker = Worker.start(name = name)
override fun dispatch(context: CoroutineContext, block: Runnable) {
worker.executeAfter(0L) { block.run() }
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = schedule(timeMillis, Runnable {
with(continuation) { resumeUndispatched(Unit) }
})
continuation.disposeOnCancellation(handle)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
schedule(timeMillis, block)
private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle {
// Workers don't have an API to cancel sent "executeAfter" block, but we are trying
// to control the damage and reduce reachable objects by nulling out `block`
// that may retain a lot of references, and leaving only an empty shell after a timely disposal
// This is a class and not an object with `block` in a closure because that would defeat the purpose.
class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
private val disposableHolder = AtomicReference<Runnable?>(block)
override fun invoke() {
disposableHolder.value?.run()
}
override fun dispose() {
disposableHolder.value = null
}
}
val disposableBlock = DisposableBlock(block)
worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock)
return disposableBlock
}
override fun close() {
worker.requestTermination().result // Note: calling "result" blocks
}
private fun Long.toMicrosSafe(): Long {
val result = this * 1000
return if (result > this) result else Long.MAX_VALUE
}
}
private class MultiWorkerDispatcher(
private val name: String,
workersCount: Int
) : CloseableCoroutineDispatcher() {
private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
private val availableWorkers = Channel<Channel<Runnable>>(Channel.UNLIMITED)
private val workerPool = OnDemandAllocatingPool(workersCount) {
Worker.start(name = "$name-$it").apply {
executeAfter { workerRunLoop() }
}
}
/**
* (number of tasks - number of workers) * 2 + (1 if closed)
*/
private val tasksAndWorkersCounter = atomic(0L)
private inline fun Long.isClosed() = this and 1L == 1L
private inline fun Long.hasTasks() = this >= 2
private inline fun Long.hasWorkers() = this < 0
private fun workerRunLoop() = runBlocking {
val privateChannel = Channel<Runnable>(1)
while (true) {
val state = tasksAndWorkersCounter.getAndUpdate {
if (it.isClosed() && !it.hasTasks()) return@runBlocking
it - 2
}
if (state.hasTasks()) {
// we promised to process a task, and there are some
tasksQueue.receive().run()
} else {
availableWorkers.send(privateChannel)
privateChannel.receiveCatching().getOrNull()?.run()
}
}
}
private fun obtainWorker(): Channel<Runnable> {
// spin loop until a worker that promised to be here actually arrives.
while (true) {
val result = availableWorkers.tryReceive()
return result.getOrNull() ?: continue
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
val state = tasksAndWorkersCounter.getAndUpdate {
if (it.isClosed())
throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
it + 2
}
if (state.hasWorkers()) {
// there are workers that have nothing to do, let's grab one of them
obtainWorker().trySend(block)
} else {
workerPool.allocate()
// no workers are available, we must queue the task
tasksQueue.trySend(block)
}
}
override fun close() {
tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
val workers = workerPool.close() // no new workers will be created
while (true) {
// check if there are workers that await tasks in their personal channels, we need to wake them up
val state = tasksAndWorkersCounter.getAndUpdate {
if (it.hasWorkers()) it + 2 else it
}
if (!state.hasWorkers())
break
obtainWorker().close()
}
/*
* Here we cannot avoid waiting on `.result`, otherwise it will lead
* to a native memory leak, including a pthread handle.
*/
val requests = workers.map { it.requestTermination() }
requests.map { it.result }
}
}