Skip to content

Commit 24a5713

Browse files
committed
Addresses both performance and correctness aspects of Future
* A regression was introduced since one of the tests in FutureSpec was comparing the wrong things, not noticing that regression led to design considerations which are in hindsight invalid. For that reason scala.concurrent. Transformation now stores its ExecutionContext for longer, this in order to make sure that we can use ec.reportFailure on NonFatal Throwables thrown when executing Future.foreach and Future.onComplete—this to live up to their contract, and to facilitate easier debugging of user code which uses Future. * I noticed that a typo had been introduced when changing between successful and failed branches when submitting Runnables to synchronous BatchingExecutors—this typo led to not activating the fast-path of that code, which yields a significant performance by delaying the inflation of allocating a Batch. * Execution of batched Runnables for synchronous BatchingExecutors has now been slightly improved due to reducing reads. * Submitting Runnables to BatchingExectuor now null-checks all Runnables, before would have cases where checks were not performed. * Improves performance of completeWith by avoiding to allocate the function which would attempt to do the actual completion.
1 parent 5654813 commit 24a5713

File tree

4 files changed

+112
-108
lines changed

4 files changed

+112
-108
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,7 @@
5050
/project/project/target/
5151
/project/project/project/target/
5252
/test/macro-annot/target/
53+
/test/files/target/
54+
/test/target/
5355
/build-sbt/
5456
local.sbt

src/library/scala/concurrent/BatchingExecutor.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ package scala.concurrent
1515
import java.util.concurrent.Executor
1616
import java.util.Objects
1717
import scala.util.control.NonFatal
18-
import scala.annotation.tailrec
18+
import scala.annotation.{switch, tailrec}
1919

2020
/**
2121
* Marker trait to indicate that a Runnable is Batchable by BatchingExecutors
@@ -24,7 +24,7 @@ trait Batchable {
2424
self: Runnable =>
2525
}
2626

27-
private[concurrent] final object BatchingExecutorStatics {
27+
private[concurrent] object BatchingExecutorStatics {
2828
final val emptyBatchArray: Array[Runnable] = new Array[Runnable](0)
2929
final val marker = ""
3030
final object MissingParentBlockContext extends BlockContext {
@@ -111,21 +111,23 @@ private[concurrent] trait BatchingExecutor extends Executor {
111111
this.size = sz + 1
112112
}
113113

114-
final def pop(): Runnable = {
115-
val sz = this.size
116-
if (sz < 2) {
117-
val ret = this.first
118-
this.first = null
119-
this.size = 0
120-
ret
121-
} else {
122-
val o = this.other
123-
val ret = o(sz - 2)
124-
o(sz - 2) = null
125-
this.size = sz - 1
126-
ret
127-
}
128-
}
114+
private[this] final def runNext(): Boolean =
115+
(this.size: @switch) match {
116+
case 0 => false
117+
case 1 =>
118+
val next = this.first
119+
this.first = null
120+
this.size = 0
121+
next.run()
122+
this.size > 0// Could have changed during next.run()
123+
case sz =>
124+
val o = this.other
125+
val next = o(sz - 2)
126+
o(sz - 2) = null
127+
this.size = sz - 1// Important to update prior to `r.run()`
128+
next.run()
129+
this.size > 0// Could have changed during next.run()
130+
}
129131

130132
// This method runs in the delegate ExecutionContext's thread
131133
override final def run(): Unit = {
@@ -135,7 +137,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
135137
if (resubmitOnBlock) BlockContext.usingBlockContext(this)(this)
136138
else runWithoutResubmit(runUntilFailureOrDone())
137139

138-
_tasksLocal.remove()
140+
_tasksLocal.set(BatchingExecutorStatics.marker)
139141
if (failure != null)
140142
throw handleRunFailure(failure)
141143
}
@@ -155,8 +157,7 @@ private[concurrent] trait BatchingExecutor extends Executor {
155157

156158
private[this] final def runUntilFailureOrDone(): Throwable =
157159
try {
158-
while(size > 0)
159-
pop().run()
160+
while(runNext()) {}
160161

161162
null
162163
} catch {
@@ -201,16 +202,15 @@ private[concurrent] trait BatchingExecutor extends Executor {
201202
*/
202203
protected def reportFailure(throwable: Throwable): Unit
203204

204-
override final def execute(runnable: Runnable): Unit =
205+
override final def execute(runnable: Runnable): Unit = {
206+
Objects.requireNonNull(runnable, "runnable is null")
205207
if (isAsync) {
206208
if (batchable(runnable)) {
207-
// We don't check if `runnable` is null here because if it is, it will be sent to `submitAsync` which will check that in the implementation(s)
208209
val b = _tasksLocal.get
209210
if (b.isInstanceOf[Batch]) b.asInstanceOf[Batch].push(runnable)
210211
else submitAsync(new Batch(runnable, resubmitOnBlock = true))
211212
} else submitAsync(runnable)
212213
} else {
213-
Objects.requireNonNull(runnable, "runnable is null")
214214
val b = _tasksLocal.get
215215
if (b.isInstanceOf[Batch]) b.asInstanceOf[Batch].push(runnable)
216216
else if (b == null) { // If there is null in _tasksLocal, set a marker and run, inflate the Batch only if needed
@@ -219,4 +219,5 @@ private[concurrent] trait BatchingExecutor extends Executor {
219219
_tasksLocal.remove() // Since we are executing synchronously, we can clear this at the end of execution
220220
} else new Batch(runnable, resubmitOnBlock = false).run()
221221
}
222+
}
222223
}

src/library/scala/concurrent/impl/Promise.scala

Lines changed: 78 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private[concurrent] object Promise {
101101
}
102102

103103
// Left non-final to enable addition of extra fields by Java/Scala converters in scala-java8-compat.
104-
class DefaultPromise[T] private[this] (initial: AnyRef) extends AtomicReference[AnyRef](initial) with scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
104+
class DefaultPromise[T] private[this] (initial: AnyRef) extends AtomicReference[AnyRef](initial) with scala.concurrent.Promise[T] with scala.concurrent.Future[T] with (Try[T] => Unit) {
105105
/**
106106
* Constructs a new, completed, Promise.
107107
*/
@@ -112,6 +112,13 @@ private[concurrent] object Promise {
112112
*/
113113
final def this() = this(Noop: AnyRef)
114114

115+
/**
116+
* WARNING: the `resolved` value needs to have been pre-resolved using `resolve()`
117+
* INTERNAL API
118+
*/
119+
override final def apply(resolved: Try[T]): Unit =
120+
tryComplete0(get(), resolved)
121+
115122
/**
116123
* Returns the associated `Future` with this `Promise`
117124
*/
@@ -248,11 +255,13 @@ private[concurrent] object Promise {
248255
} else /* if(state.isInstanceOf[Try[T]]) */ false
249256

250257
override final def completeWith(other: Future[T]): this.type = {
251-
val state = get()
252-
if ((other ne this) && !state.isInstanceOf[Try[T]]) {
253-
val resolved = if (other.isInstanceOf[DefaultPromise[T]]) other.asInstanceOf[DefaultPromise[T]].value0 else null
254-
if (resolved ne null) tryComplete0(state, resolved)
255-
else super.completeWith(other)
258+
if (other ne this) {
259+
val state = get()
260+
if (!state.isInstanceOf[Try[T]]) {
261+
val resolved = if (other.isInstanceOf[DefaultPromise[T]]) other.asInstanceOf[DefaultPromise[T]].value0 else other.value.orNull
262+
if (resolved ne null) tryComplete0(state, resolved)
263+
else other.onComplete(this)(InternalCallbackExecutor)
264+
}
256265
}
257266

258267
this
@@ -362,10 +371,11 @@ private[concurrent] object Promise {
362371
**/
363372
final class Transformation[-F, T] private[this] (
364373
private[this] final var _fun: Any => Any,
365-
private[this] final var _arg: AnyRef,
374+
private[this] final var _ec: ExecutionContext,
375+
private[this] final var _arg: Try[F],
366376
private[this] final val _xform: Byte
367377
) extends DefaultPromise[T]() with Callbacks[F] with Runnable with Batchable with OnCompleteRunnable {
368-
final def this(xform: Int, f: _ => _, ec: ExecutionContext) = this(f.asInstanceOf[Any => Any], ec.prepare(): AnyRef, xform.toByte)
378+
final def this(xform: Int, f: _ => _, ec: ExecutionContext) = this(f.asInstanceOf[Any => Any], ec.prepare(), null, xform.toByte)
369379

370380
final def benefitsFromBatching: Boolean = _xform != Xform_onComplete && _xform != Xform_foreach
371381

@@ -374,9 +384,9 @@ private[concurrent] object Promise {
374384
// Invariant: _arg is `ExecutionContext`, and non-null. `this` ne Noop.
375385
// requireNonNull(resolved) will hold as guarded by `resolve`
376386
final def submitWithValue(resolved: Try[F]): this.type = {
377-
val e = _arg.asInstanceOf[ExecutionContext]
378387
_arg = resolved
379-
try e.execute(this) /* Safe publication of _arg and _fun */
388+
val e = _ec
389+
try e.execute(this) /* Safe publication of _arg, _fun, _ec */
380390
catch {
381391
case t: Throwable => handleFailure(t, e)
382392
}
@@ -387,81 +397,77 @@ private[concurrent] object Promise {
387397
private[this] final def handleFailure(t: Throwable, e: ExecutionContext): Unit = {
388398
_fun = null // allow to GC
389399
_arg = null // see above
400+
_ec = null // see above again
390401
val wasInterrupted = t.isInstanceOf[InterruptedException]
391402
if (wasInterrupted || NonFatal(t)) {
392403
val completed = tryComplete0(get(), resolve(Failure(t)))
393404
if (completed && wasInterrupted) Thread.currentThread.interrupt()
394-
if (!completed && (e ne null)) e.reportFailure(t)
405+
406+
// Report or rethrow failures which are unlikely to otherwise be noticed
407+
if (_xform == Xform_foreach || _xform == Xform_onComplete || !completed)
408+
e.reportFailure(t)
395409
} else throw t
396410
}
397411

412+
@inline private[this] final def completeWithLink(f: Future[T]): Try[T] = {
413+
if (f.isInstanceOf[DefaultPromise[T]])
414+
f.asInstanceOf[DefaultPromise[T]].linkRootOf(this, null)
415+
else
416+
completeWith(f)
417+
418+
null
419+
}
420+
398421
// Gets invoked by the ExecutionContext, when we have a value to transform.
399-
// Invariant: if (_arg.isInstanceOf[Try[F]] && (_fun ne null))
400-
override final def run(): Unit =
422+
override final def run(): Unit = {
423+
val v = _arg
424+
val fun = _fun
425+
val ec = _ec
401426
try {
402-
val v = _arg.asInstanceOf[Try[F]]
403-
(_xform.toInt: @switch) match {
404-
case Xform_noop => doAbort(v)
405-
case Xform_map => doMap(v)
406-
case Xform_flatMap => doFlatMap(v)
407-
case Xform_transform => doTransform(v)
408-
case Xform_transformWith => doTransformWith(v)
409-
case Xform_foreach => v.foreach(_fun)
410-
case Xform_onComplete => _fun(v)
411-
case Xform_recover => doRecover(v)
412-
case Xform_recoverWith => doRecoverWith(v)
413-
case Xform_filter => doFilter(v)
414-
case Xform_collect => doCollect(v)
415-
case _ => doAbort(v)
416-
}
427+
val resolvedResult: Try[T] =
428+
(_xform.toInt: @switch) match {
429+
case Xform_noop =>
430+
null
431+
case Xform_map =>
432+
if (v.isInstanceOf[Success[F]]) Success(fun(v.asInstanceOf[Success[F]].value).asInstanceOf[T]) else v.asInstanceOf[Failure[T]]
433+
case Xform_flatMap =>
434+
if (v.isInstanceOf[Success[F]]) completeWithLink(fun(v.asInstanceOf[Success[F]].value).asInstanceOf[Future[T]])
435+
else v.asInstanceOf[Failure[T]] // Already resolved
436+
case Xform_transform =>
437+
resolve(fun(v).asInstanceOf[Try[T]])
438+
case Xform_transformWith =>
439+
completeWithLink(fun(v).asInstanceOf[Future[T]])
440+
case Xform_foreach =>
441+
if (v.isInstanceOf[Success[F]]) fun(v.asInstanceOf[Success[F]].value)
442+
null
443+
case Xform_onComplete =>
444+
fun(v)
445+
null
446+
case Xform_recover =>
447+
resolve(v.recover(fun.asInstanceOf[PartialFunction[Throwable, F]]).asInstanceOf[Try[T]]) //recover F=:=T
448+
case Xform_recoverWith =>
449+
if (v.isInstanceOf[Failure[F]]) {
450+
val f = fun.asInstanceOf[PartialFunction[Throwable, Future[T]]].applyOrElse(v.asInstanceOf[Failure[F]].exception, Future.recoverWithFailed)
451+
if (f ne Future.recoverWithFailedMarker) completeWithLink(f)
452+
else v.asInstanceOf[Failure[T]]
453+
} else v.asInstanceOf[Success[T]]
454+
case Xform_filter =>
455+
if (v.isInstanceOf[Failure[F]] || fun.asInstanceOf[F => Boolean](v.asInstanceOf[Success[F]].value)) v.asInstanceOf[Try[T]]
456+
else Future.filterFailure // Safe for unresolved completes
457+
case Xform_collect =>
458+
if (v.isInstanceOf[Success[F]]) Success(fun.asInstanceOf[PartialFunction[F, T]].applyOrElse(v.asInstanceOf[Success[F]].value, Future.collectFailed))
459+
else v.asInstanceOf[Failure[T]] // Already resolved
460+
case _ =>
461+
Failure(new IllegalStateException("BUG: encountered transformation promise with illegal type: " + _xform))
462+
}
463+
if (resolvedResult ne null)
464+
tryComplete0(get(), resolvedResult)
417465
_fun = null // allow to GC
418466
_arg = null // see above
467+
_ec = null // see above
419468
} catch {
420-
case t: Throwable => handleFailure(t, null)
469+
case t: Throwable => handleFailure(t, ec)
421470
}
422-
423-
private[this] final def doMap(v: Try[F]): Unit = { tryComplete0(get(), resolve(v.map(_fun.asInstanceOf[F => T]))) }
424-
425-
private[this] final def doFlatMap(v: Try[F]): Unit =
426-
if (v.isInstanceOf[Success[F]]) {
427-
val f = _fun(v.asInstanceOf[Success[F]].value)
428-
if(f.isInstanceOf[DefaultPromise[T]]) f.asInstanceOf[DefaultPromise[T]].linkRootOf(this, null)
429-
else completeWith(f.asInstanceOf[Future[T]])
430-
} else tryComplete0(get(), v.asInstanceOf[Try[T]]) // Already resolved
431-
432-
private[this] final def doTransform(v: Try[F]): Unit = tryComplete0(get(), resolve(_fun(v).asInstanceOf[Try[T]]))
433-
434-
private[this] final def doTransformWith(v: Try[F]): Unit = {
435-
val f = _fun(v)
436-
if(f.isInstanceOf[DefaultPromise[T]]) f.asInstanceOf[DefaultPromise[T]].linkRootOf(this, null)
437-
else completeWith(f.asInstanceOf[Future[T]])
438471
}
439-
440-
private[this] final def doRecover(v: Try[F]): Unit =
441-
tryComplete0(get(), resolve(v.recover(_fun.asInstanceOf[PartialFunction[Throwable, F]]).asInstanceOf[Try[T]])) //recover F=:=T
442-
443-
private[this] final def doRecoverWith(v: Try[F]): Unit = //recoverWith F=:=T
444-
if (v.isInstanceOf[Failure[F]]) {
445-
val f = _fun.asInstanceOf[PartialFunction[Throwable, Future[T]]].applyOrElse(v.asInstanceOf[Failure[F]].exception, Future.recoverWithFailed)
446-
if (f ne Future.recoverWithFailedMarker) {
447-
if(f.isInstanceOf[DefaultPromise[T]]) f.asInstanceOf[DefaultPromise[T]].linkRootOf(this, null)
448-
else completeWith(f)
449-
} else tryComplete0(get(), v.asInstanceOf[Failure[T]])
450-
} else tryComplete0(get(), v.asInstanceOf[Try[T]])
451-
452-
private[this] final def doFilter(v: Try[F]): Unit =
453-
tryComplete0(get(),
454-
if (v.isInstanceOf[Failure[F]] || _fun.asInstanceOf[F => Boolean](v.asInstanceOf[Success[F]].value)) v.asInstanceOf[Try[T]]
455-
else Future.filterFailure // Safe for unresolved completes
456-
)
457-
458-
private[this] final def doCollect(v: Try[F]): Unit =
459-
tryComplete0(get(),
460-
if (v.isInstanceOf[Success[F]]) Success(_fun.asInstanceOf[PartialFunction[F, T]].applyOrElse(v.asInstanceOf[Success[F]].value, Future.collectFailed))
461-
else v.asInstanceOf[Try[T]] // Already resolved
462-
)
463-
464-
private[this] final def doAbort(v: Try[F]): Unit =
465-
tryComplete0(get(), Failure(new IllegalStateException("BUG: encountered transformation promise with illegal type: " + _xform)))
466472
}
467473
}

test/files/jvm/future-spec/FutureTests.scala

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class FutureTests extends MinimalScalaTest {
3737
"A future with custom ExecutionContext" should {
3838
"shouldHandleThrowables" in {
3939
val ms = new concurrent.TrieMap[Throwable, Unit]
40-
implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(), {
40+
implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(1), {
4141
t =>
4242
ms.addOne((t, ()))
4343
})
@@ -57,28 +57,23 @@ class FutureTests extends MinimalScalaTest {
5757
Await.ready(latch, 5 seconds)
5858
"success"
5959
}
60-
val f3 = f2 map { s => s.toUpperCase }
6160

6261
f2 foreach { _ => throw new ThrowableTest("dispatcher foreach") }
63-
f2 onComplete { case Success(_) => throw new ThrowableTest("dispatcher receive"); case _ => }
62+
f2 onComplete { case Success(_) => throw new ThrowableTest("dispatcher onComplete"); case _ => }
6463

6564
latch.open()
6665

6766
Await.result(f2, defaultTimeout) mustBe ("success")
6867

6968
f2 foreach { _ => throw new ThrowableTest("current thread foreach") }
70-
f2 onComplete { case Success(_) => throw new ThrowableTest("current thread receive"); case _ => }
69+
f2 onComplete { case Success(_) => throw new ThrowableTest("current thread onComplete"); case _ => }
7170

72-
Await.result(f3, defaultTimeout) mustBe ("SUCCESS")
71+
Await.result(f2 map { s => s.toUpperCase }, defaultTimeout) mustBe ("SUCCESS")
7372

74-
val waiting = Future {
75-
Thread.sleep(1000)
76-
}
77-
Await.ready(waiting, 4000 millis)
78-
79-
if (ms.size != 4)
80-
assert(ms.size != 4, "Expected 4 throwables, found: " + ms)
81-
//FIXME should check
73+
ms.size mustBe 4
74+
val msgs = ms.keysIterator.map(_.getMessage).toSet
75+
val expectedMsgs = Set("dispatcher foreach", "dispatcher onComplete", "current thread foreach", "current thread onComplete")
76+
msgs mustBe expectedMsgs
8277
}
8378
}
8479

0 commit comments

Comments
 (0)