@@ -31,19 +31,20 @@ object CommonPool : CoroutineDispatcher() {
31
31
*/
32
32
public const val DEFAULT_PARALLELISM_PROPERTY_NAME = " kotlinx.coroutines.default.parallelism"
33
33
34
- private val parallelism = run<Int > {
35
- val property = Try { System .getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME ) }
36
- if (property == null ) {
37
- (Runtime .getRuntime().availableProcessors() - 1 ).coerceAtLeast(1 )
38
- } else {
39
- val parallelism = property.toIntOrNull()
40
- if (parallelism == null || parallelism < 1 ) {
41
- error(" Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME , but has $property " )
42
- }
43
- parallelism
34
+ // Equals to -1 if not explicitly specified
35
+ private val requestedParallelism = run<Int > {
36
+ val property = Try { System .getProperty(DEFAULT_PARALLELISM_PROPERTY_NAME ) } ? : return @run - 1
37
+ val parallelism = property.toIntOrNull()
38
+ if (parallelism == null || parallelism < 1 ) {
39
+ error(" Expected positive number in $DEFAULT_PARALLELISM_PROPERTY_NAME , but has $property " )
44
40
}
41
+ parallelism
45
42
}
46
43
44
+ private val parallelism: Int
45
+ get() = requestedParallelism.takeIf { it > 0 }
46
+ ? : (Runtime .getRuntime().availableProcessors() - 1 ).coerceAtLeast(1 )
47
+
47
48
// For debug and tests
48
49
private var usePrivatePool = false
49
50
@@ -54,17 +55,31 @@ object CommonPool : CoroutineDispatcher() {
54
55
55
56
private fun createPool (): ExecutorService {
56
57
if (System .getSecurityManager() != null ) return createPlainPool()
58
+ // Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there)
57
59
val fjpClass = Try { Class .forName(" java.util.concurrent.ForkJoinPool" ) }
58
- ? : return createPlainPool()
59
- if (! usePrivatePool) {
60
+ ? : return createPlainPool() // Fallback to plain thread pool
61
+ // Try to use commonPool unless parallelism was explicitly specified or int debug privatePool mode
62
+ if (! usePrivatePool && requestedParallelism < 0 ) {
60
63
Try { fjpClass.getMethod(" commonPool" )?.invoke(null ) as ? ExecutorService }
64
+ ?.takeIf { isGoodCommonPool(fjpClass, it) }
61
65
?.let { return it }
62
66
}
67
+ // Try to create private ForkJoinPool instance
63
68
Try { fjpClass.getConstructor(Int ::class .java).newInstance(parallelism) as ? ExecutorService }
64
69
?. let { return it }
70
+ // Fallback to plain thread pool
65
71
return createPlainPool()
66
72
}
67
73
74
+ /* *
75
+ * Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs.
76
+ */
77
+ private fun isGoodCommonPool (fjpClass : Class <* >, executor : ExecutorService ): Boolean {
78
+ val actual = Try { fjpClass.getMethod(" getParallelism" ).invoke(executor) as ? Int }
79
+ ? : return false
80
+ return actual >= 1
81
+ }
82
+
68
83
private fun createPlainPool (): ExecutorService {
69
84
val threadId = AtomicInteger ()
70
85
return Executors .newFixedThreadPool(parallelism) {
0 commit comments