Skip to content

Commit c12a9b7

Browse files
committed
Merge pull request scala#4090 from retronym/ticket/8955
SI-8955 Fix hanging fork-join pool via parallel collections
2 parents 4059d76 + 9273c33 commit c12a9b7

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

src/library/scala/concurrent/impl/ExecutionContextImpl.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,26 @@ private[concurrent] object ExecutionContextImpl {
101101
}
102102

103103
def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling)
104-
104+
val numThreads = getInt("scala.concurrent.context.numThreads", "x1")
105+
// The hard limit on the number of active threads that the thread factory will produce
106+
// SI-8955 Deadlocks can happen if maxNoOfThreads is too low, although we're currently not sure
107+
// about what the exact threshhold is. numThreads + 256 is conservatively high.
105108
val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1")
106109

107110
val desiredParallelism = range(
108111
getInt("scala.concurrent.context.minThreads", "1"),
109-
getInt("scala.concurrent.context.numThreads", "x1"),
112+
numThreads,
110113
maxNoOfThreads)
111114

115+
// The thread factory must provide additional threads to support managed blocking.
116+
val maxExtraThreads = getInt("scala.concurrent.context.maxExtraThreads", "256")
117+
112118
val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
113119
override def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause)
114120
}
115121

116122
val threadFactory = new ExecutionContextImpl.DefaultThreadFactory(daemonic = true,
117-
maxThreads = maxNoOfThreads,
123+
maxThreads = maxNoOfThreads + maxExtraThreads,
118124
prefix = "scala-execution-context-global",
119125
uncaught = uncaughtExceptionHandler)
120126

test/files/run/t8955.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import scala.collection.parallel.immutable.ParSet
2+
3+
object Test {
4+
def main(args: Array[String]): Unit = {
5+
for (i <- 1 to 2000) test()
6+
}
7+
8+
def test() {
9+
ParSet[Int]((1 to 10000): _*) foreach (x => ()) // hangs non deterministically
10+
}
11+
}
12+

0 commit comments

Comments
 (0)