Skip to content

Commit 802a024

Browse files
committed
Use home-brewed futures for parallel pickling
Uses just one thread for the rest of pickling. One thread is sufficient since there is not that much to do and we have time until the backend finishes. We might want to partially revise that decision when we support pipelined computation. In that case producing tasty early could be a win. But even in that case we might want to fine-tune the number of worker threads instead of relying on some executor. Adding more workers is easy in the new design.
1 parent 74c1924 commit 802a024

File tree

2 files changed

+79
-6
lines changed

2 files changed

+79
-6
lines changed

compiler/src/dotty/tools/dotc/transform/Pickler.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import Symbols._
1414
import Flags.Module
1515
import reporting.{ThrowingReporter, Profile, Message}
1616
import collection.mutable
17-
import scala.concurrent.{Future, Await, ExecutionContext}
18-
import scala.concurrent.duration.Duration
17+
import util.concurrent.{Executor, Future}
18+
import compiletime.uninitialized
1919

2020
object Pickler {
2121
val name: String = "pickler"
@@ -70,6 +70,11 @@ class Pickler extends Phase {
7070
body(scratch)
7171
}
7272

73+
private val executor = Executor[Array[Byte]]()
74+
75+
private def useExecutor(using Context) =
76+
Pickler.ParallelPickling && !ctx.settings.YtestPickler.value
77+
7378
override def run(using Context): Unit = {
7479
val unit = ctx.compilationUnit
7580
pickling.println(i"unpickling in run ${ctx.runId}")
@@ -123,10 +128,10 @@ class Pickler extends Phase {
123128
* function value.
124129
*/
125130
val demandPickled: () => Array[Byte] =
126-
if Pickler.ParallelPickling && !ctx.settings.YtestPickler.value then
127-
val futurePickled = Future(computePickled())(using ExecutionContext.global)
131+
if useExecutor then
132+
val futurePickled = executor.schedule(computePickled)
128133
() =>
129-
try Await.result(futurePickled, Duration.Inf)
134+
try futurePickled.force.get
130135
finally reportPositionWarnings()
131136
else
132137
val pickled = computePickled()
@@ -139,7 +144,13 @@ class Pickler extends Phase {
139144
}
140145

141146
override def runOn(units: List[CompilationUnit])(using Context): List[CompilationUnit] = {
142-
val result = super.runOn(units)
147+
val result =
148+
if useExecutor then
149+
executor.start()
150+
try super.runOn(units)
151+
finally executor.close()
152+
else
153+
super.runOn(units)
143154
if ctx.settings.YtestPickler.value then
144155
val ctx2 = ctx.fresh.setSetting(ctx.settings.YreadComments, true)
145156
testUnpickler(
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package dotty.tools.dotc.util
2+
import scala.util.{Try, Failure, Success}
3+
import scala.collection.mutable.ArrayBuffer
4+
5+
object concurrent:
6+
7+
class NoCompletion extends RuntimeException
8+
9+
class Future[T](exec: Executor[T]):
10+
private var result: Option[Try[T]] = None
11+
def force: Try[T] = synchronized {
12+
while result.isEmpty && exec.isAlive do wait(1000 /*ms*/)
13+
result.getOrElse(Failure(NoCompletion()))
14+
}
15+
def complete(r: Try[T]): Unit = synchronized {
16+
result = Some(r)
17+
notifyAll()
18+
}
19+
end Future
20+
21+
class Executor[T] extends Thread:
22+
private type WorkItem = (Future[T], () => T)
23+
24+
private var allScheduled = false
25+
private val pending = new ArrayBuffer[WorkItem]
26+
27+
def schedule(op: () => T): Future[T] = synchronized {
28+
assert(!allScheduled)
29+
val f = Future[T](this)
30+
pending += ((f, op))
31+
notifyAll()
32+
f
33+
}
34+
35+
def close(): Unit = synchronized {
36+
allScheduled = true
37+
notifyAll()
38+
}
39+
40+
private def nextPending(): Option[WorkItem] = synchronized {
41+
while pending.isEmpty && !allScheduled do wait(1000 /*ms*/)
42+
if pending.isEmpty then None
43+
else
44+
val item = pending.head
45+
pending.dropInPlace(1)
46+
Some(item)
47+
}
48+
49+
override def run(): Unit =
50+
while
51+
nextPending() match
52+
case Some((f, op)) =>
53+
f.complete(Try(op()))
54+
true
55+
case None =>
56+
false
57+
do ()
58+
end Executor
59+
end concurrent
60+
61+
62+

0 commit comments

Comments
 (0)