Skip to content

Commit 7819b61

Browse files
committed
Refactoring of Cancellables
1 parent 294bf1d commit 7819b61

File tree

4 files changed

+118
-97
lines changed

4 files changed

+118
-97
lines changed

tests/run/suspend-strawman-2/Async.scala

Lines changed: 16 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package concurrent
22
import java.util.concurrent.atomic.AtomicBoolean
33
import scala.collection.mutable
4-
import fiberRuntime.suspend
5-
import fiberRuntime.boundary
64

75
/** A context that allows to suspend waiting for asynchronous data sources
86
*/
@@ -11,13 +9,16 @@ trait Async extends Async.Config:
119
/** Wait for completion of async source `src` and return the result */
1210
def await[T](src: Async.Source[T]): T
1311

12+
/** An Async of the same kind as this one, with a given cancellation group */
13+
def withGroup(group: Cancellable.Group): Async
14+
1415
object Async:
1516

1617
/** The underlying configuration of an async block */
1718
trait Config:
1819

19-
/** The cancellable async source underlying this async computation */
20-
def root: Cancellable
20+
/** The group of cancellableup to which nested futures belong */
21+
def group: Cancellable.Group
2122

2223
/** The scheduler for runnables defined in this async computation */
2324
def scheduler: Scheduler
@@ -28,59 +29,13 @@ object Async:
2829
* ignores cancellation requests
2930
*/
3031
given fromScheduler(using s: Scheduler): Config with
31-
def root = Cancellable.empty
32+
def group = Cancellable.Unlinked
3233
def scheduler = s
3334

3435
end Config
3536

36-
/** A possible implementation of Async. Defines an `await` method based
37-
* on a method to check for cancellation that needs to be implemented by
38-
* subclasses.
39-
*
40-
* @param root the root of the Async's config
41-
* @param scheduler the scheduler of the Async's config
42-
* @param label the label of the boundary that defines the representedd async block
43-
*/
44-
abstract class Impl(val root: Cancellable, val scheduler: Scheduler)
45-
(using label: boundary.Label[Unit]) extends Async:
46-
47-
protected def checkCancellation(): Unit
48-
49-
/** Await a source first by polling it, and, if that fails, by suspending
50-
* in a onComplete call.
51-
*/
52-
def await[T](src: Source[T]): T =
53-
checkCancellation()
54-
src.poll().getOrElse:
55-
try
56-
var result: Option[T] = None // Not needed if we have full continuations
57-
suspend[T, Unit]: k =>
58-
src.onComplete: x =>
59-
scheduler.schedule: () =>
60-
result = Some(x)
61-
k.resume()
62-
true // signals to `src` that result `x` was consumed
63-
result.get
64-
/* With full continuations, the try block can be written more simply as follows:
65-
66-
suspend[T, Unit]: k =>
67-
src.onComplete: x =>
68-
scheduler.schedule: () =>
69-
k.resume(x)
70-
true
71-
*/
72-
finally checkCancellation()
73-
74-
end Impl
75-
7637
/** An implementation of Async that blocks the running thread when waiting */
77-
private class Blocking(val scheduler: Scheduler = Scheduler) extends Async:
78-
79-
def root = Cancellable.empty
80-
81-
protected def checkCancellation(): Unit = ()
82-
83-
private var hasResumed = false
38+
private class Blocking(val scheduler: Scheduler, val group: Cancellable.Group) extends Async:
8439

8540
def await[T](src: Source[T]): T =
8641
src.poll().getOrElse:
@@ -94,18 +49,26 @@ object Async:
9449
while result.isEmpty do wait()
9550
result.get
9651

52+
def withGroup(group: Cancellable.Group) = Blocking(scheduler, group)
53+
end Blocking
54+
55+
9756
/** Execute asynchronous computation `body` on currently running thread.
9857
* The thread will suspend when the computation waits.
9958
*/
10059
def blocking[T](body: Async ?=> T, scheduler: Scheduler = Scheduler): T =
101-
body(using Blocking())
60+
body(using Blocking(scheduler, Cancellable.Unlinked))
10261

10362
/** The currently executing Async context */
10463
inline def current(using async: Async): Async = async
10564

10665
/** Await source result in currently executing Async context */
10766
inline def await[T](src: Source[T])(using async: Async): T = async.await(src)
10867

68+
def group[T](body: Async ?=> T)(using async: Async): T =
69+
val newGroup = Cancellable.Group().link()
70+
body(using async.withGroup(newGroup))
71+
10972
/** A function `T => Boolean` whose lineage is recorded by its implementing
11073
* classes. The Listener function accepts values of type `T` and returns
11174
* `true` iff the value was consumed by an async block.
Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,62 @@
11
package concurrent
2+
import scala.collection.mutable
23

34
/** A trait for cancellable entities that can be grouped */
45
trait Cancellable:
56

7+
private var group: Cancellable.Group = Cancellable.Unlinked
8+
69
/** Issue a cancel request */
710
def cancel(): Unit
811

9-
/** Add a given child to this Cancellable, so that the child will be cancelled
10-
* when the Cancellable itself is cancelled.
12+
/** Add this cancellable to the given group after removing
13+
* it from the previous group in which it was.
14+
*/
15+
def link(group: Cancellable.Group): this.type =
16+
this.group.drop(this)
17+
this.group = group
18+
this.group.add(this)
19+
this
20+
21+
/** Link this cancellable to the cancellable group of the
22+
* current async context.
1123
*/
12-
def addChild(child: Cancellable): Unit
24+
def link()(using ac: Async): this.type =
25+
link(ac.group)
26+
27+
/** Unlink this cancellable from its group. */
28+
def unlink(): this.type =
29+
link(Cancellable.Unlinked)
1330

1431
object Cancellable:
1532

16-
/** A cancelled entity that ignores all `cancel` and `addChild` requests */
17-
object empty extends Cancellable:
18-
def cancel() = ()
19-
def addChild(child: Cancellable) = ()
33+
/** A group of cancellable members */
34+
class Group extends Cancellable:
35+
private var members: mutable.Set[Cancellable] = mutable.Set()
36+
37+
/** Cancel all members and clear the members set */
38+
def cancel() =
39+
members.toArray.foreach(_.cancel())
40+
members.clear()
41+
42+
/** Add given member to the members set */
43+
def add(member: Cancellable): Unit = synchronized:
44+
members += member
45+
46+
/** Remove given member from the members set if it is an element */
47+
def drop(member: Cancellable): Unit = synchronized:
48+
members -= member
49+
end Group
50+
51+
/** A sentinal group of cancellables that are in fact not linked
52+
* to any real group. `cancel`, `add`, and `drop` do nothing when
53+
* called on this group.
54+
*/
55+
object Unlinked extends Group:
56+
override def cancel() = ()
57+
override def add(member: Cancellable): Unit = ()
58+
override def drop(member: Cancellable): Unit = ()
59+
end Unlinked
2060

2161
end Cancellable
62+

tests/run/suspend-strawman-2/futures.scala

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package concurrent
22

33
import scala.collection.mutable, mutable.ListBuffer
4+
import fiberRuntime.suspend
45
import fiberRuntime.boundary
56
import scala.compiletime.uninitialized
67
import scala.util.{Try, Success, Failure}
@@ -17,15 +18,10 @@ trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable:
1718
def value(using async: Async): T
1819

1920
/** Eventually stop computation of this future and fail with
20-
* a `Cancellation` exception. Also cancel all children.
21+
* a `Cancellation` exception.
2122
*/
2223
def cancel(): Unit
2324

24-
/** If this future has not yet completed, add `child` so that it will
25-
* be cancelled together with this future in case the future is cancelled.
26-
*/
27-
def addChild(child: Cancellable): Unit
28-
2925
object Future:
3026

3127
/** A future that is completed explicitly by calling its
@@ -37,14 +33,9 @@ object Future:
3733
private class CoreFuture[+T] extends Future[T]:
3834

3935
@volatile protected var hasCompleted: Boolean = false
36+
protected var cancelRequest = false
4037
private var result: Try[T] = uninitialized // guaranteed to be set if hasCompleted = true
4138
private val waiting: mutable.Set[Try[T] => Boolean] = mutable.Set()
42-
private val children: mutable.Set[Cancellable] = mutable.Set()
43-
44-
private def extract[T](s: mutable.Set[T]): List[T] = synchronized:
45-
val xs = s.toList
46-
s.clear()
47-
xs
4839

4940
// Async.Source method implementations
5041

@@ -60,16 +51,7 @@ object Future:
6051
// Cancellable method implementations
6152

6253
def cancel(): Unit =
63-
val othersToCancel = synchronized:
64-
if hasCompleted then Nil
65-
else
66-
result = Failure(new CancellationException())
67-
hasCompleted = true
68-
extract(children)
69-
othersToCancel.foreach(_.cancel())
70-
71-
def addChild(child: Cancellable): Unit = synchronized:
72-
if !hasCompleted then children += this
54+
cancelRequest = true
7355

7456
// Future method implementations
7557

@@ -86,30 +68,68 @@ object Future:
8668
* the type with which the future was created since `Promise` is invariant.
8769
*/
8870
private[Future] def complete(result: Try[T] @uncheckedVariance): Unit =
89-
if !hasCompleted then
90-
this.result = result
91-
hasCompleted = true
92-
for listener <- extract(waiting) do listener(result)
71+
val toNotify = synchronized:
72+
if hasCompleted then Nil
73+
else
74+
this.result = result
75+
hasCompleted = true
76+
val ws = waiting.toList
77+
waiting.clear()
78+
ws
79+
for listener <- toNotify do listener(result)
9380

9481
end CoreFuture
9582

9683
/** A future that is completed by evaluating `body` as a separate
9784
* asynchronous operation in the given `scheduler`
9885
*/
99-
private class RunnableFuture[+T](body: Async ?=> T)(using scheduler: Scheduler)
86+
private class RunnableFuture[+T](body: Async ?=> T)(using ac: Async.Config)
10087
extends CoreFuture[T]:
10188

10289
/** a handler for Async */
10390
private def async(body: Async ?=> Unit): Unit =
91+
class FutureAsync(val scheduler: Scheduler, val group: Cancellable.Group) extends Async:
92+
93+
def checkCancellation() =
94+
if cancelRequest then throw CancellationException()
95+
96+
/** Await a source first by polling it, and, if that fails, by suspending
97+
* in a onComplete call.
98+
*/
99+
def await[T](src: Async.Source[T]): T =
100+
checkCancellation()
101+
src.poll().getOrElse:
102+
try
103+
var result: Option[T] = None // Not needed if we have full continuations
104+
suspend[T, Unit]: k =>
105+
src.onComplete: x =>
106+
scheduler.schedule: () =>
107+
result = Some(x)
108+
k.resume()
109+
true // signals to `src` that result `x` was consumed
110+
result.get
111+
/* With full continuations, the try block can be written more simply as follows:
112+
113+
suspend[T, Unit]: k =>
114+
src.onComplete: x =>
115+
scheduler.schedule: () =>
116+
k.resume(x)
117+
true
118+
*/
119+
finally checkCancellation()
120+
121+
def withGroup(group: Cancellable.Group) = FutureAsync(scheduler, group)
122+
104123
boundary [Unit]:
105-
given Async = new Async.Impl(this, scheduler):
106-
def checkCancellation() =
107-
if hasCompleted then throw new CancellationException()
108-
body
124+
body(using FutureAsync(ac.scheduler, ac.group))
109125
end async
110126

111-
scheduler.schedule: () =>
112-
async(complete(Try(body)))
127+
ac.scheduler.schedule: () =>
128+
async:
129+
link()
130+
Async.group:
131+
complete(Try(body))
132+
unlink()
113133

114134
end RunnableFuture
115135

@@ -119,9 +139,7 @@ object Future:
119139
* children of that context's root.
120140
*/
121141
def apply[T](body: Async ?=> T)(using ac: Async.Config): Future[T] =
122-
val f = RunnableFuture(body)(using ac.scheduler)
123-
ac.root.addChild(f)
124-
f
142+
RunnableFuture(body)
125143

126144
extension [T](f1: Future[T])
127145

tests/run/suspend-strawman-2/scheduler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@ trait Scheduler:
55
def schedule(task: Runnable): Unit = task.run()
66

77
object Scheduler extends Scheduler:
8-
given fromAsyncConfig(using ac: Async.Config): Scheduler = ac.scheduler
98
end Scheduler
109

0 commit comments

Comments
 (0)