Skip to content

inner .par collection breaks outer ForkJoinTaskSupport on Linux x86 Scala 2.12.5 #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ktegan opened this issue Jul 23, 2018 · 30 comments
Closed

Comments

@ktegan
Copy link

ktegan commented Jul 23, 2018

I first submitted this to stackoverflow and Volodymyr Glushak was not able to recreate the bug on Mac OS or using scastie. I'm hoping that someone else can recreate this issue or figure out what about my current environment might be causing the problem.

I'm using Scala 2.12.5 and Java OpenJDK 1.8.0_161-b14 on a Linux 3.10.0 x86_64 kernel.

I want to make a parallel collection that uses a fixed number of threads. The standard advice for this is to set tasksupport for the parallel collection to use a ForkJoinTaskSupport with a ForkJoinPool with a fixed number of threads. That works fine UNTIL the processing you are doing in your parallel collection itself uses a parallel collection. When this is the case it appears that the limit set for the ForkJoinPool goes away.

A simple test looks something like the following:

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ForkJoinPool
import scala.collection.parallel.ForkJoinTaskSupport

object InnerPar {
  val numTasks = 100
  val numThreads = 10

  /** return maximum number of simultaneous threads running when requesting numThreads */
  def forkJoinPoolMaxThreads(useInnerPar:Boolean): Int = {

    // every thread in the outer collection will increment
    // and decrement this counter as it starts and exits
    val threadCounter = new AtomicInteger(0)

    // this function increments and decrements threadCounter
    // on start and exit, optionally creates an inner parallel collection
    // and finally returns the thread count it found at startup
    def incrementAndCountThreads(idx:Int):Int = {
      val otherThreadsRunning:Int = threadCounter.getAndAdd(1)
      if (useInnerPar) {
        (0 until 20).toSeq.par.map { elem => elem + 1 }
      }
      Thread.sleep(10)
      threadCounter.getAndAdd(-1)
      otherThreadsRunning + 1
    }

    // create parallel collection using a ForkJoinPool with numThreads
    val parCollection = (0 until numTasks).toVector.par
    parCollection.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
    val threadCountLogList = parCollection.map { idx =>
      incrementAndCountThreads(idx)
    }

    // return the maximum number of simultaneous threads
    // running simultaneously (as counted on each thread start)
    threadCountLogList.max
  } 


  def main(args:Array[String]):Unit = {
    val testConfigs = Seq(true, false, true, false)
    testConfigs.foreach { useInnerPar =>
      val maxThreads = forkJoinPoolMaxThreads(useInnerPar)

      // the total number of threads running should not have exceeded
      // numThreads at any point
      val isSuccess = (maxThreads <= numThreads)

      println(f"useInnerPar $useInnerPar%6s, success is $isSuccess%6s   (maxThreads $maxThreads%4d)") 
    }
  }
}

And from this in my environment I get the following output, showing that more than numThreads (in the example 10) threads are running simultaneously if we create a parallel collection inside of incrementAndCountThreads().

useInnerPar   true, success is  false   (maxThreads   22)
useInnerPar  false, success is   true   (maxThreads   10)
useInnerPar   true, success is  false   (maxThreads   24)
useInnerPar  false, success is   true   (maxThreads   10)

Also note that using a ForkJoinTaskSupport in the inner collection does not fix the problem. In other words you get the same results if you use the following code for the inner collection:

  if (useInnerPar) {
    val innerParCollection = (0 until 20).toVector.par
    innerParCollection.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(3))
    innerParCollection.map { elem => elem + 1 }
  }

Am I missing something? If not is there a way to work around this?

Thanks!

@dlaflamme
Copy link

dlaflamme commented Aug 20, 2018

I came across this when investigating strange behavior when using parallel collections. Using ktegan's sample code, I was able to replicate this on three different configurations: 2 linux and 1 windows. See my results below. This is really weird. Is using nested parallel collections like this not supported?

I've made a gist of the test app so you can quickly wget the code and test on your host.

https://gist.githubusercontent.com/dlaflamme/81c94e9afdae0fc61d79a93cc3310656/raw/c7f56d7319440ed2c688fe61d4d3483fee1f14d5/InnerPar.scala

CentOS:

$ cat /etc/redhat-release
CentOS Linux release 7.5.1804 (Core)
$
$ uname -a
Linux centosj1 3.10.0-862.9.1.el7.x86_64 scala/bug#1 SMP Mon Jul 16 16:29:36 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)
$ scala -version
Scala code runner version 2.12.6 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.
$ scala InnerPar.scala
useInnerPar   true, success is  false   (maxThreads   22)
useInnerPar  false, success is   true   (maxThreads   10)
useInnerPar   true, success is  false   (maxThreads   27)
useInnerPar  false, success is   true   (maxThreads   10)
$

Ubuntu:

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 18.04.1 LTS
Release:    18.04
Codename:   bionic
$ uname -a
Linux ubuntuj1 4.15.0-32-generic scala/bug#35-Ubuntu SMP Fri Aug 10 17:58:07 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
$  java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-0ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)
$ scala -version
Scala code runner version 2.12.6 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.
$  scala InnerPar.scala
useInnerPar   true, success is  false   (maxThreads   22)
useInnerPar  false, success is   true   (maxThreads   10)
useInnerPar   true, success is  false   (maxThreads   27)
useInnerPar  false, success is   true   (maxThreads   10)
$

Windows (from IntelliJ sbt shell):

[IJ]sbt:hello-scala> eval System.getProperty("java.version")
[info] ans: String = 1.8.0_162
[IJ]sbt:hello-scala>
[IJ]sbt:hello-scala> about
[info] This is sbt 1.1.6
[info] The current project is ProjectRef(uri("file:/C:/hello-scala/"), "root") 0.1.0-SNAPSHOT
[info] The current project is built against Scala 2.12.6
[info] Available Plugins: sbt.plugins.IvyPlugin, sbt.plugins.JvmPlugin, sbt.plugins.CorePlugin, sbt.plugins.JUnitXmlReportPlugin, sbt.plugins.Giter8TemplatePlugin, org.jetbrains.sbt.StructurePlugin, org.jetbrains.sbt.IdeaShellPlugin
[info] sbt, sbt plugins, and build definitions are using Scala 2.12.6
[IJ]sbt:hello-scala> runMain example.InnerPar
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list
[info] Running example.InnerPar
useInnerPar   true, success is  false   (maxThreads   17)
useInnerPar  false, success is   true   (maxThreads   10)
useInnerPar   true, success is  false   (maxThreads   21)
useInnerPar  false, success is   true   (maxThreads   10)
[success] Total time: 3 s, completed Aug 19, 2018 9:10:54 PM
[IJ]sbt:hello-scala>

On the same Windows host, running with whatever version of scala is in my path at the CLI, it works as expected:

C:\hello-scala>java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)

Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
C:\hello-scala>scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.
C:\hello-scala>
C:\hello-scala>scala src\main\scala\example\InnerPar.scala
useInnerPar   true, success is   true   (maxThreads   10)
useInnerPar  false, success is   true   (maxThreads   10)
useInnerPar   true, success is   true   (maxThreads   10)
useInnerPar  false, success is   true   (maxThreads   10)

C:\hello-scala>

@dlaflamme
Copy link

@SethTisue, what exactly does it mean for this issue to be moved to the Backlog? It seems to me that the behavior observed here completely breaks the user's control of concurrency in their programs and affects the latest two released versions of scala running on the latest version of OpenJDK's 1.8 series.

More information:
It looks like this this problem was introduced between scala 2.12.4 and 2.12.5. I am unable to get ktegan's test to fail on 2.12.4 but it fails consistently on 2.12.5 and 2.12.6 (all tests on the same host/OpenJDK version)

@SethTisue
Copy link
Member

SethTisue commented Aug 22, 2018

what exactly does it mean for this issue to be moved to the Backlog?

“Backlog” issues are issues we won't hold up any particular release over. but any such classification is only provisional, of course

I'd assumed this was a longstanding issue, but if it in fact regressed after 2.12.4, then ah, that's different. so I've moved it to the 2.12.7 milestone (but note that doesn't promise a fix, it just means the issue will at least be considered during release planning)

is there a post-2.12.4 change that's the likely culprit?

@ktegan
Copy link
Author

ktegan commented Aug 22, 2018

Both of these commits are related to ForkJoinPool and are around the right time:

scala/scala@59e1041
scala/scala@14caff6

@SethTisue
Copy link
Member

@viktorklang @som-snytt ?

@viktorklang
Copy link
Contributor

@SethTisue I'm not sure how it is intended to work. I don't see how my commit would impact this but please let me know if it seems to be causing this problem.

@som-snytt
Copy link
Contributor

som-snytt commented Aug 22, 2018

The example code is executing the "outer" tasks on the custom pool, but the "inner" tasks on the global pool. For the old behavior, set the "task support" also for the "inner" collection.

There is also a misunderstanding that parallelism equals threads; it's really the target for running threads. JDK 9 has a new constructor for ForkJoinPool that takes core and maxPoolSize.

It might be the inner.join() that tells the pool to create a thread; I don't have the source to look at.

@viktorklang
Copy link
Contributor

Thanks @som-snytt !

@ktegan
Copy link
Author

ktegan commented Aug 23, 2018

Hold on a second. The problem persists whether or not you set the task support for the inner collection (as mentioned with an alternative code snippet at the bottom of the issue).

Also I don't understand your answer. If the outer tasks are running in a custom pool, then why does creating or not creating an inner parallel collection mutate the behavior of the outer collection? An immutable parallel collection in Scala should have the same rough concurrency behavior regardless of what code is executing inside a call to map(). This contract is broken (the outer collection appears to be incorrectly mutated) if an inner collection is created inside of one of the threads because all of a sudden the outer parallel collection spawns a many more threads than requested. The atomicInteger in the test is only measuring the number of threads running in parallel for the outer pool, and so the numbers from the atomicInteger shouldn't really depend on the code executing inside the parallel collection, but the results show otherwise.

@som-snytt
Copy link
Contributor

Yes, it's a little mind-bending. The idea is that the outer pool will allocate more threads when it thinks a thread is blocked, i.e., active but not running. When the inner task is run on the same pool, the task is forked, otherwise it submits the task to the other pool. I suspect in the case of submitting to the second pool, joining that task is blocking. Note that it spawns threads to keep parallelism number of threads running, not just active (created but not dead).

That's why the counter in the test isn't really counting threads, it's counting tasks. As an implementation detail, those happen to correspond to threads; but it's not counting running threads; imagine those tasks used managed blocking because they're doing I/O, that would explicitly tell the pool to go ahead and grow to keep making progress.

Here's my version of the sample code with println added during the Red Sox game. Comment out setting tasksupport on the "inner" collection to see the bad behavior.

package nestedpar

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ForkJoinPool
import scala.collection.parallel.ForkJoinTaskSupport

object InnerPar {
  val numTasks = 10
  val numThreads = 4

  /** return maximum number of simultaneous threads running when requesting numThreads */
  def forkJoinPoolMaxThreads(useInnerPar: Boolean): Int = {
    val pool = new ForkJoinPool(
      numThreads,
      /*
      ForkJoinPool.defaultForkJoinWorkerThreadFactory,
      null,
      true,
      4,
      4,
      1,
      null,
      60,
      java.util.concurrent.TimeUnit.SECONDS
      */
    )
    /* Java 9 API
    ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime, TimeUnit unit)
    */
    val taskSupport = new ForkJoinTaskSupport(pool)

    // every thread in the outer collection will increment
    // and decrement this counter as it starts and exits
    val threadCounter = new AtomicInteger(0)

    // this function increments and decrements threadCounter
    // on start and exit, optionally creates an inner parallel collection
    // and finally returns the thread count it found at startup
    def incrementAndCountThreads(idx: Int): Int = {
      println(s"Outer op on: ${Thread.currentThread}")
      val threadsRunning: Int = threadCounter.incrementAndGet()
      if (useInnerPar) {
        val c = (0 until 20).toSeq.par
        c.tasksupport = taskSupport
        println(s"Inner op on: ${Thread.currentThread}, c.tasksupport is ${c.tasksupport}, env is ${c.tasksupport.environment}")
        c.map { elem =>
          println(s"Inner map on: ${Thread.currentThread}")
          elem + 1
        }
        //(0 until 20).toSeq.par.map { elem => elem + 1 }
      } else {
        /* Any execute on a different pool shows the symptom
        List(1).par.map { elem =>
          println(s"Inner map on: ${Thread.currentThread}")
          elem + 1
        }
        */
      }
      Thread.sleep(10L)
      threadCounter.decrementAndGet()
      println(s"Running: $threadsRunning")
      threadsRunning
    }
    // create parallel collection using a ForkJoinPool with numThreads
    val parCollection = (1 to numTasks).toVector.par
    //parCollection.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
    parCollection.tasksupport = taskSupport
    println(s"parC.tasksupport is ${parCollection.tasksupport}, env is ${parCollection.tasksupport.environment}")
    val threadCountLogList = parCollection.map { idx =>
      incrementAndCountThreads(idx)
    }
    // return the maximum number of simultaneous threads
    // running simultaneously (as counted on each thread start)
    threadCountLogList.max
  }

  def main(args:Array[String]):Unit = {
    println(s"global is ${concurrent.ExecutionContext.global}")
    //val testConfigs = Seq(true, false, true, false)
    val testConfigs = Seq(true)
    testConfigs.foreach { useInnerPar =>
      val maxThreads = forkJoinPoolMaxThreads(useInnerPar)

      // the total number of threads running should not have exceeded
      // numThreads at any point
      val isSuccess = (maxThreads <= numThreads)

      println(f"useInnerPar $useInnerPar%6s, success is $isSuccess%6s   (maxThreads $maxThreads%4d)")
    }
  }
}

@som-snytt
Copy link
Contributor

Another thought: it might be possible to have the par framework start operations on the global pool by default, but for map to fork (use the current pool) if tasksupport has not been explicitly set by the user.

I don't know which behavior would be most expected by users. This probably goes to the core intuition of the framework, that you're using ordinary collections methods but they are parallelized without further ado.

@ktegan
Copy link
Author

ktegan commented Aug 23, 2018

Thanks, I think I understand things a little bit better now.

So this behavior of "the outer pool will allocate more threads when it thinks a thread is blocked" was added after Scala 2.12.4? Is there some way to cap the number of simultaneous tasks? Or alternatively cap the number of active (not running) threads? In my case each task is allocating part of a finite resource (such as memory), so I really want to cap is tasks not running threads.

@som-snytt
Copy link
Contributor

Sorry, I also just noticed your comment about setting tasksupport. It has to be the same pool for the inner collection. It sounds like you just want the old behavior, so I would just do that.

The issue that changed the behavior was where the inner collection has an explicitly set tasksupport, but previously that was ignored.

Otherwise you want the Java 9 FJP as in my comment.

NB: I'll submit a PR to use tasksupport only if it is explicitly set by user. In case that is the preferred ergonomics.

@viktorklang
Copy link
Contributor

@viktorklang
Copy link
Contributor

@som-snytt I also just noticed that there's a possible issue from 2.13.0-M5 and onwards with the following line: https://github.com/scala/scala-parallel-collections/blob/master/core/src/main/scala/scala/collection/parallel/Tasks.scala#L394

Since https://github.com/scala/scala/blob/2.13.x/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L110

So I venture to say that Tasks should look at if the ExecutionContext isInstanceOf ForkJoinPool as well?

@ktegan
Copy link
Author

ktegan commented Aug 23, 2018

Thanks for the comments and help everyone, they are well appreciated.

  1. @som-snytt You said that "The issue that changed the behavior was where the inner collection has an explicitly set tasksupport, but previously that was ignored." However, based on comments by @dlaflamme it sounds like the test shows different behavior between versions 2.12.4 and 2.12.5 when the inner collection does not explicitly set a tasksupport. Any idea what might account for this change?

  2. In the above test if useInnerPar is false the thread still calls Thread.sleep(10) which would seem to block the thread. However in this case the outer collection does not create more tasks than originally specified. It is only when an inner parallel collection is created and executed that the behavior changes. I tried sleeping for 100ms instead and still no change in behvior. Is there some kind of task (reading from a file?) that I can do to make sure that this issue of the outer collection spawning more tasks than expected is related to some kind of thread blocking operation and isn't specific to using an inner parallel collection?

  3. Do I understand correctly that in Java 8 there is no easy way to put a hard cap on the total number of tasks simultaneous tasks, whereas in Java 9 there are extra controls that make this possible? If so what is the new Java 9 parameter / feature?

  4. If anyone has an idea of why you can only create and set tasksupports for about ~1200 collections (Repeated use of ForkJoinTaskSupport on ParCollection causes OutOfMemoryException bug#11097) any tips would be much appreciated. A fix would be ideal but right now I'm just not even sure what resource is running out (Java says it cannot create any more native threads).

@ktegan
Copy link
Author

ktegan commented Aug 23, 2018

@viktorklang , for (3) it sounds like we're saying that the new maximumPoolSize parameter in Java9 will limit the number of live threads instead of limiting the number of currently running threads as (I'm gathering) the parallelism parameter does. Thanks!

@viktorklang
Copy link
Contributor

@ktegan To put it differently: parallelism is a target, not a limit.

@som-snytt
Copy link
Contributor

@viktorklang Thanks. The architecture makes me feel like I need a "conspiracy wall".

@ktegan The changed behavior is on 2.12.5. The commit page (your link above) shows the tags and also the PR link.

@ktegan
Copy link
Author

ktegan commented Aug 23, 2018

@som-snytt I see the link, but my question is how does the code in the link apply to the current issue. You said "The issue that changed the behavior was where the inner collection has an explicitly set tasksupport ..." Based on that I thought you were saying that if the inner collection DOES NOT explicitly set its tasksupport then the PR in the link should have no effect. If you agree with that statement, then my question is if your PR only affects cases where the inner collection explicitly sets the task support, why are we seeing behavior changes when the inner collection does not explicitly set the the task support?

@som-snytt
Copy link
Contributor

@ktegan The original issue was #284 as linked from the PR. It's the same as your example, except the bug was that the tasksupport was ignored. If tasksupport is not set explicitly, you get a default tasksupport.

The documentation is unambiguous:

Each parallel collection is parametrized with a task support object which is responsible for scheduling and load-balancing tasks to processors.

https://docs.scala-lang.org/overviews/parallel-collections/configuration.html

@ktegan
Copy link
Author

ktegan commented Aug 23, 2018

Sorry, my question isn't coming across well. Would you expect the PR you did in #284 to affect cases where the inner collection is not setting the tasksupport?

@som-snytt
Copy link
Contributor

Yes, the tasksupport is used whether it was set by user or not. That's why I point out that "each par collection is parameterized with a task support"; I don't see anything which says it is pre-empted in some cases.

My Nota Bene above is about the case where it is not explicitly set.

@ktegan
Copy link
Author

ktegan commented Aug 24, 2018

OK, it seems like there is no simple short term solution to this. Until Java 9 comes out we might move back to 2.12.4 so that the target thread count is respected 99% of the time instead of 0% of the time.

Thanks for the help!

@som-snytt
Copy link
Contributor

@ktegan I like a happy customer. My previous advice to set the same testsupport on both collections is a simple short term solution. That will be 2.12.4 behavior with respect to this issue.

@ktegan
Copy link
Author

ktegan commented Aug 24, 2018

But if the body of the map calls functions that call other functions that call other functions I have to pass the instance of the outer collection's testsupport through all of those. Basically any function that could be called inside of an parallel collection now has to take an additional testsupport parameter to be able to have the old behavior.

@som-snytt
Copy link
Contributor

That is the issue at hand, @ktegan .

@ktegan
Copy link
Author

ktegan commented Aug 24, 2018

@som-snytt I'm not positive I understand your comment, but I will assume for now you agree with my assessment. Please let me know if I'm mistaken.

@SethTisue
Copy link
Member

SethTisue commented Aug 24, 2018

@ktegan thanks for the report and investigation. having this discussion to refer to could be invaluable to anyone else who runs into this. thx @som-snytt and @viktorklang as well

@SethTisue SethTisue transferred this issue from scala/bug Dec 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants