-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathCommonPool.kt
143 lines (123 loc) · 5.79 KB
/
CommonPool.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
/**
* Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
*
* If there isn't a SecurityManager present it uses [java.util.concurrent.ForkJoinPool] when available, which implements
* efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even
* when it already executes inside the pool. When available, it wraps `ForkJoinPool.commonPool` and provides a similar
* shared pool where not.
*
* If there is a SecurityManager present (as would be if running inside a Java Web Start context) then a plain thread
* pool is created. This is to work around the fact that ForkJoinPool creates threads that cannot perform
* privileged actions.
*/
internal object CommonPool : ExecutorCoroutineDispatcher() {
/**
* Name of the property that controls default parallelism level of [CommonPool].
* If the property is not specified, `Runtime.getRuntime().availableProcessors() - 1` will be used instead (or `1` for single-core JVM).
* Note that until Java 10, if an application is run within a container,
* `Runtime.getRuntime().availableProcessors()` is not aware of container constraints and will return the real number of cores.
*/
private const val DEFAULT_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.default.parallelism"
override val executor: Executor
get() = pool ?: getOrCreatePoolSync()
// Equals to -1 if not explicitly specified
private val requestedParallelism = run<Int> {
val property = Try { System.getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME) } ?: return@run -1
val parallelism = property.toIntOrNull()
if (parallelism == null || parallelism < 1) {
error("Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME, but has $property")
}
parallelism
}
private val parallelism: Int
get() = requestedParallelism.takeIf { it > 0 }
?: (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
// For debug and tests
private var usePrivatePool = false
@Volatile
private var pool: Executor? = null
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
// Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there)
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool() // Fallback to plain thread pool
// Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode
if (!usePrivatePool && requestedParallelism < 0) {
Try { fjpClass.getMethod("commonPool").invoke(null) as? ExecutorService }
?.takeIf { isGoodCommonPool(fjpClass, it) }
?.let { return it }
}
// Try to create private ForkJoinPool instance
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
// Fallback to plain thread pool
return createPlainPool()
}
/**
* Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs.
*/
internal fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean {
// We cannot use getParallelism, since it lies to us (always returns at least 1)
// So we submit a task and check that getPoolSize is at least one after that
// A broken FJP (that is configured for 0 parallelism) would not execute the task and
// would report its pool size as zero.
executor.submit {}
val actual = Try { fjpClass.getMethod("getPoolSize").invoke(executor) as? Int }
?: return false
return actual >= 1
}
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
@Synchronized
private fun getOrCreatePoolSync(): Executor =
pool ?: createPool().also { pool = it }
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
// CommonPool only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.enqueue(block)
}
}
// used for tests
@Synchronized
internal fun usePrivatePool() {
shutdown(0)
usePrivatePool = true
pool = null
}
// used for tests
@Synchronized
internal fun shutdown(timeout: Long) {
(pool as? ExecutorService)?.apply {
shutdown()
if (timeout > 0)
awaitTermination(timeout, TimeUnit.MILLISECONDS)
shutdownNow().forEach { DefaultExecutor.enqueue(it) }
}
pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") }
}
// used for tests
@Synchronized
internal fun restore() {
shutdown(0)
usePrivatePool = false
pool = null
}
override fun toString(): String = "CommonPool"
override fun close(): Unit = error("Close cannot be invoked on CommonPool")
}