Skip to content

Commit 9273c33

Browse files
committed
SI-8955 Fix hanging fork-join pool via parallel collections
A recent change [1] to Scala's default fork join thread pool caused intermittent deadlocks. This is only visible in the development series of Scala, 2.12.0-SNAPSHOT. We changed our thread factory to place a hard limit the number of threads created (equal to the desired parallelism.) I have extracted a test case [2] that uses jsr166e directly, rather than using Scala's parallel collections and abstractions on top of FJ. In the comments of the bug, Viktor suggests this was too aggressive and instead we ought to increase the limit to parallelism + 256 (with a system property override.) He explained: > The number 256 is going to be the default for the max threads for > FJP in Java9 (down from 32k) so this change will harmonize the > settings while making it possible to override from the outside. > > The cause of the deadlock is twofold: > > 1) The test uses ExecutionContext.global, which is not designed > for typical ForkJoin workloads since it has async = true > (FIFO instead of LIFO) > 2) And we capped the default max number of threads to be created > when doing managed blocking from 32k to number of cores > (a tad too aggressive it seems) Through testing, I found that for this example I could trigger the hang with: parallelismLevel | maxThreads ----------------------------- 2 | <= 4 4 | <= 9 8 | <= 11 16 | <= 15 I have emailed concurrency-interest [3] to help analyse the problem further, but in the interest of avoiding hangs in the scalacheck/parallel-collections test, I'm implementing Viktor's suggestion in the interim. [1] scala#4042 [2] https://gist.github.com/retronym/2e14cdab6d5612562d95 [3] http://markmail.org/message/czphdyjxpkixeztv
1 parent 4059d76 commit 9273c33

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

src/library/scala/concurrent/impl/ExecutionContextImpl.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,26 @@ private[concurrent] object ExecutionContextImpl {
101101
}
102102

103103
def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling)
104-
104+
val numThreads = getInt("scala.concurrent.context.numThreads", "x1")
105+
// The hard limit on the number of active threads that the thread factory will produce
106+
// SI-8955 Deadlocks can happen if maxNoOfThreads is too low, although we're currently not sure
107+
// about what the exact threshhold is. numThreads + 256 is conservatively high.
105108
val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1")
106109

107110
val desiredParallelism = range(
108111
getInt("scala.concurrent.context.minThreads", "1"),
109-
getInt("scala.concurrent.context.numThreads", "x1"),
112+
numThreads,
110113
maxNoOfThreads)
111114

115+
// The thread factory must provide additional threads to support managed blocking.
116+
val maxExtraThreads = getInt("scala.concurrent.context.maxExtraThreads", "256")
117+
112118
val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
113119
override def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause)
114120
}
115121

116122
val threadFactory = new ExecutionContextImpl.DefaultThreadFactory(daemonic = true,
117-
maxThreads = maxNoOfThreads,
123+
maxThreads = maxNoOfThreads + maxExtraThreads,
118124
prefix = "scala-execution-context-global",
119125
uncaught = uncaughtExceptionHandler)
120126

test/files/run/t8955.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import scala.collection.parallel.immutable.ParSet
2+
3+
object Test {
4+
def main(args: Array[String]): Unit = {
5+
for (i <- 1 to 2000) test()
6+
}
7+
8+
def test() {
9+
ParSet[Int]((1 to 10000): _*) foreach (x => ()) // hangs non deterministically
10+
}
11+
}
12+

0 commit comments

Comments
 (0)