-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathDispatcher.kt
142 lines (111 loc) · 4.65 KB
/
Dispatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package kotlinx.coroutines.scheduling
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import java.util.concurrent.*
import kotlin.coroutines.*
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= CORE_POOL_SIZE) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}
// Shuts down the dispatcher, used only by Dispatchers.shutdown()
internal fun shutdown() {
super.close()
}
// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
override fun close() {
throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
}
override fun toString(): String = "Dispatchers.Default"
}
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() {
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= MAX_POOL_SIZE) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}
// This name only leaks to user code as part of .limitedParallelism machinery
override fun toString(): String {
return "Dispatchers.IO"
}
}
// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
override val executor: Executor
get() = this
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return UnlimitedIoScheduler.limitedParallelism(parallelism, name)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
default.dispatch(context, block)
}
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
default.dispatchYield(context, block)
}
override fun close() {
error("Cannot be invoked on Dispatchers.IO")
}
override fun toString(): String = "Dispatchers.IO"
}
// Instantiated in tests so we can test it in isolation
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block, tailDispatch = true)
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
coroutineScheduler.dispatch(block, context, tailDispatch)
}
override fun close() {
coroutineScheduler.close()
}
// fot tests only
@Synchronized
internal fun usePrivateScheduler() {
coroutineScheduler.shutdown(1_000L)
coroutineScheduler = createScheduler()
}
// for tests only
@Synchronized
internal fun shutdown(timeout: Long) {
coroutineScheduler.shutdown(timeout)
}
// for tests only
internal fun restore() = usePrivateScheduler() // recreate scheduler
}