Skip to content

Commit 3975391

Browse files
committed
Fixed the logic to detect broken ForkJoinPool instance and added a corresponding test
Fixes #432, #288
1 parent 1e12210 commit 3975391

File tree

2 files changed

+63
-3
lines changed

2 files changed

+63
-3
lines changed

core/kotlinx-coroutines-core/src/CommonPool.kt

+8-3
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object CommonPool : CoroutineDispatcher() {
5858
// Reflection on ForkJoinPool class so that it works on JDK 6 (which is absent there)
5959
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
6060
?: return createPlainPool() // Fallback to plain thread pool
61-
// Try to use commonPool unless parallelism was explicitly specified or int debug privatePool mode
61+
// Try to use commonPool unless parallelism was explicitly specified or in debug privatePool mode
6262
if (!usePrivatePool && requestedParallelism < 0) {
6363
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
6464
?.takeIf { isGoodCommonPool(fjpClass, it) }
@@ -74,8 +74,13 @@ object CommonPool : CoroutineDispatcher() {
7474
/**
7575
* Checks that this ForkJoinPool's parallelism is at least one to avoid pathological bugs.
7676
*/
77-
private fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean {
78-
val actual = Try { fjpClass.getMethod("getParallelism").invoke(executor) as? Int }
77+
internal fun isGoodCommonPool(fjpClass: Class<*>, executor: ExecutorService): Boolean {
78+
// We cannot use getParallelism, since it lies to us (always returns at least 1)
79+
// So we submit a task and check that getPoolSize is at least one after that
80+
// A broken FJP (that is configured for 0 parallelism) would not execute the task and
81+
// would report its pool size as zero.
82+
executor.submit {}
83+
val actual = Try { fjpClass.getMethod("getPoolSize").invoke(executor) as? Int }
7984
?: return false
8085
return actual >= 1
8186
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
import kotlinx.coroutines.experimental.*
6+
import org.junit.*
7+
import org.junit.Assert.*
8+
import java.lang.reflect.*
9+
import java.util.concurrent.*
10+
11+
class CommonPoolTest {
12+
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
13+
14+
@Test
15+
fun testIsGoodCommonPool() {
16+
// Test only on JDKs that has all we need
17+
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") } ?: return
18+
val wtfClass = Try { Class.forName("java.util.concurrent.ForkJoinPool${'$'}ForkJoinWorkerThreadFactory") } ?: return
19+
val dwtfClass = Try { Class.forName("java.util.concurrent.ForkJoinPool${'$'}DefaultForkJoinWorkerThreadFactory") } ?: return
20+
// We need private constructor to create "broken" FJP instance
21+
val fjpCtor = Try { fjpClass.getDeclaredConstructor(
22+
Int::class.java,
23+
wtfClass,
24+
Thread.UncaughtExceptionHandler::class.java,
25+
Int::class.java,
26+
String::class.java
27+
) } ?: return
28+
fjpCtor.isAccessible = true
29+
val dwtfCtor = Try { dwtfClass.getDeclaredConstructor() } ?: return
30+
dwtfCtor.isAccessible = true
31+
// Create bad pool
32+
val fjp0: ExecutorService = createFJP(0, fjpCtor, dwtfCtor) ?: return
33+
assertFalse(CommonPool.isGoodCommonPool(fjpClass, fjp0))
34+
fjp0.shutdown()
35+
// Create good pool
36+
val fjp1: ExecutorService = createFJP(1, fjpCtor, dwtfCtor) ?: return
37+
assertTrue(CommonPool.isGoodCommonPool(fjpClass, fjp1))
38+
fjp1.shutdown()
39+
println("CommonPool.isGoodCommonPool test passed")
40+
}
41+
42+
fun createFJP(
43+
parallelism: Int,
44+
fjpCtor: Constructor<out Any>,
45+
dwtfCtor: Constructor<out Any>
46+
): ExecutorService? = Try {
47+
fjpCtor.newInstance(
48+
parallelism,
49+
dwtfCtor.newInstance(),
50+
Thread.getDefaultUncaughtExceptionHandler(),
51+
0,
52+
"Worker"
53+
)
54+
} as? ExecutorService
55+
}

0 commit comments

Comments
 (0)