Skip to content

Commit 6514830

Browse files
committed
Improves performance and fairness of BatchingExecutor
* Allows for max 16 Runnables to execute nested on stack * Allows for max 1024 Runnables to be executed before resubmit * Makes synchronous BatchingExecutors also use submitForExecution since this makes things easier to instrument.
1 parent bbda70d commit 6514830

File tree

4 files changed

+79
-63
lines changed

4 files changed

+79
-63
lines changed

src/library/scala/concurrent/BatchingExecutor.scala

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,13 @@ trait Batchable {
2626

2727
private[concurrent] object BatchingExecutorStatics {
2828
final val emptyBatchArray: Array[Runnable] = new Array[Runnable](0)
29-
final val marker = ""
29+
30+
// Max number of Runnables executed nested before starting to batch (to prevent stack exhaustion)
31+
final val syncPreBatchDepth = 16
32+
33+
// Max number of Runnables processed in one go (to prevent starvation of other tasks on the pool)
34+
final val runLimit = 1024
35+
3036
final object MissingParentBlockContext extends BlockContext {
3137
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T =
3238
try thunk finally throw new IllegalStateException("BUG in BatchingExecutor.Batch: parentBlockContext is null")
@@ -117,31 +123,24 @@ private[concurrent] trait BatchingExecutor extends Executor {
117123
this.size = sz + 1
118124
}
119125

120-
@tailrec protected final def runAll(): Unit = // TODO: Impose max limit of number of items (fairness)
121-
(this.size: @switch) match {
122-
case 0 =>
123-
case 1 =>
124-
val next = this.first
125-
this.first = null
126-
this.size = 0
127-
next.run()
128-
runAll()
129-
case sz =>
130-
val o = this.other
131-
val next = o(sz - 2)
132-
o(sz - 2) = null
133-
this.size = sz - 1// Important to update prior to `r.run()`
134-
next.run()
135-
runAll()
136-
}
137-
138-
protected final def runUntilFailureOrDone(): Throwable =
139-
try {
140-
runAll()
141-
null
142-
} catch {
143-
case t: Throwable => t
144-
}
126+
@tailrec protected final def runN(n: Int): Unit =
127+
if (n > 0)
128+
(this.size: @switch) match {
129+
case 0 =>
130+
case 1 =>
131+
val next = this.first
132+
this.first = null
133+
this.size = 0
134+
next.run()
135+
runN(n - 1)
136+
case sz =>
137+
val o = this.other
138+
val next = o(sz - 2)
139+
o(sz - 2) = null
140+
this.size = sz - 1
141+
next.run()
142+
runN(n - 1)
143+
}
145144
}
146145

147146
private[this] final class AsyncBatch private(_first: Runnable, _other: Array[Runnable], _size: Int) extends AbstractBatch(_first, _other, _size) with Runnable with BlockContext with (BlockContext => Throwable) {
@@ -159,12 +158,15 @@ private[concurrent] trait BatchingExecutor extends Executor {
159158
}
160159

161160
/* LOGIC FOR ASYNCHRONOUS BATCHES */
162-
override final def apply(prevBlockContext: BlockContext): Throwable = {
161+
override final def apply(prevBlockContext: BlockContext): Throwable = try {
163162
parentBlockContext = prevBlockContext
164-
val failure = runUntilFailureOrDone()
163+
runN(BatchingExecutorStatics.runLimit)
164+
null
165+
} catch {
166+
case t: Throwable => t // We are handling exceptions on the outside of this method
167+
} finally {
165168
parentBlockContext = BatchingExecutorStatics.MissingParentBlockContext
166169
_tasksLocal.remove()
167-
failure
168170
}
169171

170172
/* Attempts to resubmit this Batch to the underlying ExecutionContext,
@@ -174,7 +176,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
174176
*/
175177
private[this] final def resubmit(cause: Throwable): Throwable =
176178
if (this.size > 0) {
177-
try { submitAsync(this); cause } catch {
179+
try { submitForExecution(this); cause } catch {
178180
case inner: Throwable =>
179181
if (NonFatal(inner)) {
180182
val e = new ExecutionException("Non-fatal error occurred and resubmission failed, see suppressed exception.", cause)
@@ -185,7 +187,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
185187
} else cause // TODO: consider if NonFatals should simply be `reportFailure`:ed rather than rethrown
186188

187189
private[this] final def cloneAndClear(): AsyncBatch = {
188-
val newBatch = new AsyncBatch(first, other, size)
190+
val newBatch = new AsyncBatch(this.first, this.other, this.size)
189191
this.first = null
190192
this.parentBlockContext = BatchingExecutorStatics.MissingParentBlockContext
191193
this.other = BatchingExecutorStatics.emptyBatchArray
@@ -196,56 +198,70 @@ private[concurrent] trait BatchingExecutor extends Executor {
196198
override final def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
197199
val pbc = parentBlockContext // Store this for later since `cloneAndClear()` will reset it
198200

199-
if(this.size > 0) // If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
200-
submitAsync(cloneAndClear()) // If this throws then we have bigger problems
201+
// If we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
202+
if(this.size > 0)
203+
submitForExecution(cloneAndClear()) // If this throws then we have bigger problems
201204

202205
pbc.blockOn(thunk) // Now delegate the blocking to the previous BC
203206
}
204207
}
205208

206209
private[this] final class SyncBatch(runnable: Runnable) extends AbstractBatch(runnable, BatchingExecutorStatics.emptyBatchArray, 1) with Runnable {
207-
@tailrec private[this] final def runWithoutResubmit(failure: Throwable): Throwable =
208-
if (failure != null && (failure.isInstanceOf[InterruptedException] || NonFatal(failure))) {
209-
reportFailure(failure)
210-
runWithoutResubmit(runUntilFailureOrDone())
211-
} else {
212-
_tasksLocal.set(BatchingExecutorStatics.marker)
213-
failure
210+
@tailrec override final def run(): Unit = {
211+
try runN(BatchingExecutorStatics.runLimit) catch {
212+
case ie: InterruptedException =>
213+
reportFailure(ie) // TODO: Handle InterruptedException differently?
214+
case f if NonFatal(f) =>
215+
reportFailure(f)
214216
}
215217

216-
override final def run(): Unit = {
217-
_tasksLocal.set(this) // This is later cleared in `runWithoutResubmit`
218-
219-
val f = runWithoutResubmit(runUntilFailureOrDone())
220-
221-
if (f != null)
222-
throw f
218+
if (this.size > 0)
219+
run()
223220
}
224221
}
225222

226-
/** SHOULD throw a NullPointerException when `runnable` is null
223+
/** MUST throw a NullPointerException when `runnable` is null
224+
* When implementing a sync BatchingExecutor, it is RECOMMENDED
225+
* to implement this method as `runnable.run()`
227226
*/
228-
protected def submitAsync(runnable: Runnable): Unit
227+
protected def submitForExecution(runnable: Runnable): Unit
229228

230229
/** Reports that an asynchronous computation failed.
231230
* See `ExecutionContext.reportFailure(throwable: Throwable)`
232231
*/
233232
protected def reportFailure(throwable: Throwable): Unit
234233

234+
/**
235+
* WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
236+
* implementation of `BatchingExecutor`
237+
*/
235238
protected final def submitAsyncBatched(runnable: Runnable): Unit = {
236239
val b = _tasksLocal.get
237240
if (b.isInstanceOf[AsyncBatch]) b.asInstanceOf[AsyncBatch].push(runnable)
238-
else submitAsync(new AsyncBatch(runnable))
241+
else submitForExecution(new AsyncBatch(runnable))
239242
}
240243

244+
/**
245+
* WARNING: Never use both `submitAsyncBatched` and `submitSyncBatched` in the same
246+
* implementation of `BatchingExecutor`
247+
*/
241248
protected final def submitSyncBatched(runnable: Runnable): Unit = {
242249
Objects.requireNonNull(runnable, "runnable is null")
243-
val b = _tasksLocal.get
250+
val tl = _tasksLocal
251+
val b = tl.get
244252
if (b.isInstanceOf[SyncBatch]) b.asInstanceOf[SyncBatch].push(runnable)
245-
else if (b == null) { // If there is null in _tasksLocal, set a marker and run, inflate the Batch only if needed
246-
_tasksLocal.set(BatchingExecutorStatics.marker) // Set a marker to indicate that we are submitting synchronously
247-
runnable.run() // If we observe a non-null task which isn't a batch here, then allocate a batch
248-
_tasksLocal.remove() // Since we are executing synchronously, we can clear this at the end of execution
249-
} else new SyncBatch(runnable).run()
253+
else {
254+
val i = if (b ne null) b.asInstanceOf[java.lang.Integer].intValue else 0
255+
if (i < BatchingExecutorStatics.syncPreBatchDepth) {
256+
tl.set(java.lang.Integer.valueOf(i + 1))
257+
try submitForExecution(runnable) // User code so needs to be try-finally guarded here
258+
finally tl.set(b)
259+
} else {
260+
val batch = new SyncBatch(runnable)
261+
tl.set(batch)
262+
submitForExecution(batch)
263+
tl.set(b) // Batch only throws fatals so no need for try-finally here
264+
}
265+
}
250266
}
251267
}

src/library/scala/concurrent/Future.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,7 @@ object Future {
867867
// doesn't need to create defaultExecutionContext as
868868
// a side effect.
869869
private[concurrent] object InternalCallbackExecutor extends ExecutionContextExecutor with BatchingExecutor {
870-
override final def submitAsync(runnable: Runnable): Unit = reportFailure(null) // Cannot submit async
870+
override final def submitForExecution(runnable: Runnable): Unit = runnable.run()
871871
final override def execute(runnable: Runnable): Unit = submitSyncBatched(runnable)
872872
override final def reportFailure(t: Throwable): Unit =
873873
ExecutionContext.defaultReporter(new IllegalStateException("problem in scala.concurrent internal callback", t))

src/library/scala/concurrent/impl/ExecutionContextImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,13 @@ private[concurrent] object ExecutionContextImpl {
101101
uncaught = (thread: Thread, cause: Throwable) => reporter(cause))
102102

103103
new ForkJoinPool(desiredParallelism, threadFactory, threadFactory.uncaught, true) with ExecutionContextExecutorService with BatchingExecutor {
104-
final override def submitAsync(runnable: Runnable): Unit = super[ForkJoinPool].execute(runnable)
104+
final override def submitForExecution(runnable: Runnable): Unit = super[ForkJoinPool].execute(runnable)
105105

106106
final override def execute(runnable: Runnable): Unit =
107107
if ((!runnable.isInstanceOf[Promise.Transformation[_,_]] || runnable.asInstanceOf[Promise.Transformation[_,_]].benefitsFromBatching) && runnable.isInstanceOf[Batchable])
108108
submitAsyncBatched(runnable)
109109
else
110-
submitAsync(runnable)
110+
submitForExecution(runnable)
111111

112112
final override def reportFailure(cause: Throwable): Unit =
113113
getUncaughtExceptionHandler() match {

test/benchmarks/src/main/scala/scala/concurrent/FutureBenchmark.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,24 @@ abstract class AbstractBaseFutureBenchmark {
3636
executionContext = pool match {
3737
case "fjp" =>
3838
val fjp = new ForkJoinPool(threads) with ExecutionContext with BatchingExecutor {
39-
final override def submitAsync(runnable: Runnable): Unit = super[ForkJoinPool].execute(runnable)
39+
final override def submitForExecution(runnable: Runnable): Unit = super[ForkJoinPool].execute(runnable)
4040
final override def execute(runnable: Runnable): Unit =
4141
if ((!runnable.isInstanceOf[impl.Promise.Transformation[_,_]] || runnable.asInstanceOf[impl.Promise.Transformation[_,_]].benefitsFromBatching) && runnable.isInstanceOf[Batchable])
4242
submitAsyncBatched(runnable)
4343
else
44-
submitAsync(runnable)
44+
submitForExecution(runnable)
4545
override final def reportFailure(t: Throwable) = t.printStackTrace(System.err)
4646
}
4747
executorService = fjp // we want to close this
4848
fjp
4949
case "fix" =>
5050
val fix = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable]()) with ExecutionContext with BatchingExecutor {
51-
final override def submitAsync(runnable: Runnable): Unit = super[ThreadPoolExecutor].execute(runnable)
51+
final override def submitForExecution(runnable: Runnable): Unit = super[ThreadPoolExecutor].execute(runnable)
5252
final override def execute(runnable: Runnable): Unit =
5353
if ((!runnable.isInstanceOf[impl.Promise.Transformation[_,_]] || runnable.asInstanceOf[impl.Promise.Transformation[_,_]].benefitsFromBatching) && runnable.isInstanceOf[Batchable])
5454
submitAsyncBatched(runnable)
5555
else
56-
submitAsync(runnable)
56+
submitForExecution(runnable)
5757
override final def reportFailure(t: Throwable) = t.printStackTrace(System.err)
5858
}
5959
executorService = fix // we want to close this

0 commit comments

Comments
 (0)