Skip to content

Controlling parallelism in nested parallel collection #284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
algobardo opened this issue Oct 30, 2017 · 3 comments
Closed

Controlling parallelism in nested parallel collection #284

algobardo opened this issue Oct 30, 2017 · 3 comments
Assignees

Comments

@algobardo
Copy link

I have really hard time understanding why the following behaviour of parallel collections in scala should be the expected one.

Consider the following piece of code

    import java.util.concurrent.TimeUnit
    import scala.collection.parallel.ForkJoinTaskSupport
    
    object test extends App {
    
      val CHANGE_ME = false
    
      val externalParallelism = if (CHANGE_ME) 10 else 2
    
      val l = (1 until 100).par
      val l2 = List(1).par // par but with only one element
    
      l2.tasksupport = new ForkJoinTaskSupport(
        new scala.concurrent.forkjoin.ForkJoinPool(externalParallelism))
      l.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
    
      l2.map { j =>
        l.map { i =>
          println(s"STARTED $i")
          TimeUnit.SECONDS.sleep(10000) // blocking
          println(s"DONE $i")
        }
      }
    }

If the code above is ran with CHANGE_ME = false you will immediately observe

    STARTED 1
    STARTED 50

showing that the current parallelism is 2, which is correct since the internal parallel collection l is bound to a task support with a fork-join thread pool with parallelism 2.
If now you set CHANGE_ME = true, you will surprisingly observe

STARTED 87
STARTED 93
STARTED 13
STARTED 25
STARTED 75
STARTED 56
STARTED 62
STARTED 50
STARTED 1

witnessing that the internal parallel collection is using threads that should be bound to the external parallel collection.

I guess the reason has to be found in the executeAndWaitResult

def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
    val fjtask = newWrappedTask(task)

    if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
      fjtask.fork
    } else {
      forkJoinPool.execute(fjtask)
    }

    fjtask.sync()
    // if (fjtask.body.throwable != null) println("throwing: " + fjtask.body.throwable + " at " + fjtask.body)
    fjtask.body.forwardThrowable()
    fjtask.body.result
  }

where it is the current thread identity that drive the pool where the task is pushed.

  • Is this the expected behaviour?
  • If so, how can the parallelism of l be bound then in situations where the parallel collection is created and operated in a ForkJoinWorkerThread ?
  • The example above is artificial, but in my case I have parallel scala tests that are causing "internal" parallel collections ignoring the parallelism bound.

JVM: java version "1.8.0_121"
Scala: 2.11.11

@som-snytt
Copy link
Contributor

som-snytt commented Oct 31, 2017

It looks like similar behavior was subsequently fixed in the default executor.

class MyForkJoinTaskSupport(environment0: ForkJoinPool = ForkJoinTasks.defaultForkJoinPool) extends ForkJoinTaskSupport(environment0) { 
  override def execute[R, Tp](task: Task[R, Tp]): () => R = { 
    val fjtask = newWrappedTask(task)

    Thread.currentThread match { 
      case fjw: ForkJoinWorkerThread if fjw.getPool eq forkJoinPool => fjtask.fork()
      case _ => forkJoinPool.execute(fjtask)
    }
    () => { 
      fjtask.sync()
      fjtask.body.forwardThrowable()
      fjtask.body.result
    }
  } 

  override def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { 
    val fjtask = newWrappedTask(task)

    Thread.currentThread match { 
      case fjw: ForkJoinWorkerThread if fjw.getPool eq forkJoinPool => fjtask.fork()
      case _ => forkJoinPool.execute(fjtask)
    }
    fjtask.sync()
    fjtask.body.forwardThrowable()
    fjtask.body.result
  }
}

@SethTisue
Copy link
Member

SethTisue commented Nov 1, 2017

although 2.12.5 won't be released until 2018, note that we do publish nightly builds of Scala

oh, but I see you're on 2.11, actually. ah well

note that as of Scala 2.13, the parallel collections will be a separately versioned module with its own release schedule, so it will become possible to get bugfixes like this out in a more timely manner

@som-snytt
Copy link
Contributor

The custom tasksupport as shown above is pretty easy workaround. I didn't verify it on 2.11 but it's probably OK.

I was aware par would get spun out but didn't know it had already happened. I wonder if it will have partests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants