Skip to content

Commit 12b74ac

Browse files
authored
Merge pull request #35 from som-snytt/issue/tasktest
TaskSupport respects specific FJP
2 parents 4ba26d6 + a4e626b commit 12b74ac

File tree

2 files changed

+36
-10
lines changed

2 files changed

+36
-10
lines changed

core/src/main/scala/scala/collection/parallel/Tasks.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,10 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
254254
def execute[R, Tp](task: Task[R, Tp]): () => R = {
255255
val fjtask = newWrappedTask(task)
256256

257-
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
258-
fjtask.fork
259-
} else {
260-
forkJoinPool.execute(fjtask)
257+
Thread.currentThread match {
258+
case fjw: ForkJoinWorkerThread if fjw.getPool eq forkJoinPool => fjtask.fork()
259+
case _ => forkJoinPool.execute(fjtask)
261260
}
262-
263261
() => {
264262
fjtask.sync()
265263
fjtask.body.forwardThrowable()
@@ -277,12 +275,10 @@ trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
277275
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
278276
val fjtask = newWrappedTask(task)
279277

280-
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
281-
fjtask.fork
282-
} else {
283-
forkJoinPool.execute(fjtask)
278+
Thread.currentThread match {
279+
case fjw: ForkJoinWorkerThread if fjw.getPool eq forkJoinPool => fjtask.fork()
280+
case _ => forkJoinPool.execute(fjtask)
284281
}
285-
286282
fjtask.sync()
287283
// if (fjtask.body.throwable != null) println("throwing: " + fjtask.body.throwable + " at " + fjtask.body)
288284
fjtask.body.forwardThrowable()
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package scala.collection.parallel
2+
3+
import org.junit.Test
4+
import org.junit.Assert._
5+
6+
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread}, ForkJoinPool._
7+
8+
import CollectionConverters._
9+
10+
class TaskTest {
11+
@Test
12+
def `t10577 task executes on foreign pool`(): Unit = {
13+
def mkFactory(name: String) = new ForkJoinWorkerThreadFactory {
14+
override def newThread(pool: ForkJoinPool) = {
15+
val t = new ForkJoinWorkerThread(pool) {}
16+
t.setName(name)
17+
t
18+
}
19+
}
20+
def mkPool(name: String) = new ForkJoinPool(1, mkFactory(name), null, false)
21+
22+
val one = List(1).par
23+
val two = List(2).par
24+
25+
one.tasksupport = new ForkJoinTaskSupport(mkPool("one"))
26+
two.tasksupport = new ForkJoinTaskSupport(mkPool("two"))
27+
28+
for (x <- one ; y <- two) assertEquals("two", Thread.currentThread.getName)
29+
}
30+
}

0 commit comments

Comments
 (0)