Skip to content

Commit b2dcd69

Browse files
committed
Deregister stale callbacks in Future.firstCompletedOf
1 parent 962562a commit b2dcd69

File tree

2 files changed

+52
-7
lines changed

2 files changed

+52
-7
lines changed

library/src/scala/concurrent/Future.scala

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ trait Future[+T] extends Awaitable[T] {
126126
*/
127127
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit
128128

129+
/** The same as [[onComplete]], but additionally returns a function which can be
130+
* invoked to unregister the callback function. Removing a callback from a long-lived
131+
* future can enable garbage collection of objects referenced by the closure.
132+
*/
133+
private[concurrent] def onCompleteWithUnregister[U](f: Try[T] => U)(implicit executor: ExecutionContext): () => Unit
129134

130135
/* Miscellaneous */
131136

@@ -616,6 +621,7 @@ object Future {
616621
}
617622

618623
override final def onComplete[U](f: Try[Nothing] => U)(implicit executor: ExecutionContext): Unit = ()
624+
override private[concurrent] final def onCompleteWithUnregister[U](f: Try[Nothing] => U)(implicit executor: ExecutionContext): () => Unit = () => ()
619625
override final def isCompleted: Boolean = false
620626
override final def value: Option[Try[Nothing]] = None
621627
override final def failed: Future[Throwable] = this
@@ -732,15 +738,25 @@ object Future {
732738
if (!i.hasNext) Future.never
733739
else {
734740
val p = Promise[T]()
735-
val firstCompleteHandler = new AtomicReference[Promise[T]](p) with (Try[T] => Unit) {
736-
override final def apply(v1: Try[T]): Unit = {
737-
val r = getAndSet(null)
738-
if (r ne null)
739-
r tryComplete v1 // tryComplete is likely to be cheaper than complete
741+
val firstCompleteHandler = new AtomicReference(List.empty[() => Unit]) with (Try[T] => Unit) {
742+
final def apply(res: Try[T]): Unit = {
743+
val deregs = getAndSet(null)
744+
if (deregs != null) {
745+
p.tryComplete(res) // tryComplete is likely to be cheaper than complete
746+
deregs.foreach(_.apply())
747+
}
748+
}
749+
}
750+
var completed = false
751+
while (i.hasNext && !completed) {
752+
val deregs = firstCompleteHandler.get
753+
if (deregs == null) completed = true
754+
else {
755+
val d = i.next().onCompleteWithUnregister(firstCompleteHandler)
756+
if (!firstCompleteHandler.compareAndSet(deregs, d :: deregs))
757+
d.apply()
740758
}
741759
}
742-
while(i.hasNext && firstCompleteHandler.get != null) // exit early if possible
743-
i.next().onComplete(firstCompleteHandler)
744760
p.future
745761
}
746762
}

library/src/scala/concurrent/impl/Promise.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ private[concurrent] object Promise {
215215
override final def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit =
216216
dispatchOrAddCallbacks(get(), new Transformation[T, Unit](Xform_onComplete, func, executor))
217217

218+
override private[concurrent] final def onCompleteWithUnregister[U](func: Try[T] => U)(implicit executor: ExecutionContext): () => Unit = {
219+
val t = new Transformation[T, Unit](Xform_onComplete, func, executor)
220+
dispatchOrAddCallbacks(get(), t)
221+
() => unregisterCallback(t)
222+
}
223+
218224
override final def failed: Future[Throwable] =
219225
if (!get().isInstanceOf[Success[_]]) super.failed
220226
else Future.failedFailureFuture // Cached instance in case of already known success
@@ -319,6 +325,15 @@ private[concurrent] object Promise {
319325
p.dispatchOrAddCallbacks(p.get(), callbacks)
320326
}
321327

328+
@tailrec private def unregisterCallback(t: Transformation[_, _]): Unit = {
329+
val state = get()
330+
if (state eq t) {
331+
if (!compareAndSet(state, Noop)) unregisterCallback(t)
332+
} else if (state.isInstanceOf[ManyCallbacks[_]]) {
333+
if (!compareAndSet(state, removeCallback(state.asInstanceOf[ManyCallbacks[T]], t))) unregisterCallback(t)
334+
}
335+
}
336+
322337
// IMPORTANT: Noop should never be passed in here, neither as left OR as right
323338
@tailrec private[this] final def concatCallbacks(left: Callbacks[T], right: Callbacks[T]): Callbacks[T] =
324339
if (left.isInstanceOf[Transformation[T,_]]) new ManyCallbacks[T](left.asInstanceOf[Transformation[T,_]], right)
@@ -327,6 +342,20 @@ private[concurrent] object Promise {
327342
concatCallbacks(m.rest, new ManyCallbacks(m.first, right))
328343
}
329344

345+
@tailrec private[this] final def removeCallback(cs: Callbacks[T], t: Transformation[_, _], result: Callbacks[T] = null): AnyRef =
346+
if (cs eq t) {
347+
if (result == null) Noop
348+
else result
349+
}
350+
else if (cs.isInstanceOf[ManyCallbacks[_]]) {
351+
val m = cs.asInstanceOf[ManyCallbacks[T]]
352+
if (m.first eq t) {
353+
if (result == null) m.rest
354+
else concatCallbacks(m.rest, result)
355+
}
356+
else removeCallback(m.rest, t, if (result == null) m.first else new ManyCallbacks(m.first, result))
357+
} else cs
358+
330359
// IMPORTANT: Noop should not be passed in here, `callbacks` cannot be null
331360
@tailrec
332361
private[this] final def submitWithValue(callbacks: Callbacks[T], resolved: Try[T]): Unit =

0 commit comments

Comments
 (0)