Skip to content

Commit d096eea

Browse files
committed
Add explanation document and align implementation with it
1 parent 503ec8b commit d096eea

File tree

6 files changed

+577
-53
lines changed

6 files changed

+577
-53
lines changed

tests/run/suspend-strawman-2/Async.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ trait Async:
1919
object Async:
2020

2121
/** The underlying configuration of an async block */
22-
case class Config(scheduler: ExecutionContext, group: Cancellable.Group)
22+
case class Config(scheduler: ExecutionContext, group: CancellationGroup)
2323

2424
trait LowPrioConfig:
2525

2626
/** A toplevel async group with given scheduler and a synthetic root that
2727
* ignores cancellation requests
2828
*/
2929
given fromExecutionContext(using scheduler: ExecutionContext): Config =
30-
Config(scheduler, Cancellable.Unlinked)
30+
Config(scheduler, CancellationGroup.Unlinked)
3131

3232
end LowPrioConfig
3333

@@ -69,14 +69,14 @@ object Async:
6969
inline def await[T](src: Source[T])(using async: Async): T = async.await(src)
7070

7171
def group[T](body: Async ?=> T)(using async: Async): T =
72-
val newGroup = Cancellable.Group().link()
72+
val newGroup = CancellationGroup().link()
7373
body(using async.withConfig(async.config.copy(group = newGroup)))
7474

7575
/** A function `T => Boolean` whose lineage is recorded by its implementing
7676
* classes. The Listener function accepts values of type `T` and returns
7777
* `true` iff the value was consumed by an async block.
7878
*/
79-
trait Listener[-T] extends Function[T, Boolean]
79+
trait Listener[-T] extends (T => Boolean)
8080

8181
/** A listener for values that are processed by the given source `src` and
8282
* that are demanded by the continuation listener `continue`.

tests/run/suspend-strawman-2/Cancellable.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import scala.collection.mutable
44
/** A trait for cancellable entities that can be grouped */
55
trait Cancellable:
66

7-
private var group: Cancellable.Group = Cancellable.Unlinked
7+
private var group: CancellationGroup = CancellationGroup.Unlinked
88

99
/** Issue a cancel request */
1010
def cancel(): Unit
1111

1212
/** Add this cancellable to the given group after removing
1313
* it from the previous group in which it was.
1414
*/
15-
def link(group: Cancellable.Group): this.type =
15+
def link(group: CancellationGroup): this.type =
1616
this.group.drop(this)
1717
this.group = group
1818
this.group.add(this)
@@ -26,37 +26,37 @@ trait Cancellable:
2626

2727
/** Unlink this cancellable from its group. */
2828
def unlink(): this.type =
29-
link(Cancellable.Unlinked)
29+
link(CancellationGroup.Unlinked)
3030

31-
object Cancellable:
31+
end Cancellable
32+
33+
class CancellationGroup extends Cancellable:
34+
private var members: mutable.Set[Cancellable] = mutable.Set()
3235

33-
/** A group of cancellable members */
34-
class Group extends Cancellable:
35-
private var members: mutable.Set[Cancellable] = mutable.Set()
36+
/** Cancel all members and clear the members set */
37+
def cancel() =
38+
members.toArray.foreach(_.cancel())
39+
members.clear()
3640

37-
/** Cancel all members and clear the members set */
38-
def cancel() =
39-
members.toArray.foreach(_.cancel())
40-
members.clear()
41+
/** Add given member to the members set */
42+
def add(member: Cancellable): Unit = synchronized:
43+
members += member
4144

42-
/** Add given member to the members set */
43-
def add(member: Cancellable): Unit = synchronized:
44-
members += member
45+
/** Remove given member from the members set if it is an element */
46+
def drop(member: Cancellable): Unit = synchronized:
47+
members -= member
4548

46-
/** Remove given member from the members set if it is an element */
47-
def drop(member: Cancellable): Unit = synchronized:
48-
members -= member
49-
end Group
49+
object CancellationGroup:
5050

5151
/** A sentinal group of cancellables that are in fact not linked
5252
* to any real group. `cancel`, `add`, and `drop` do nothing when
5353
* called on this group.
5454
*/
55-
object Unlinked extends Group:
55+
object Unlinked extends CancellationGroup:
5656
override def cancel() = ()
5757
override def add(member: Cancellable): Unit = ()
5858
override def drop(member: Cancellable): Unit = ()
5959
end Unlinked
6060

61-
end Cancellable
61+
end CancellationGroup
6262

tests/run/suspend-strawman-2/channels.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,25 @@ import fiberRuntime.suspend
55
import scala.concurrent.ExecutionContext
66
import Async.{Listener, await}
77

8+
/** A common interface for channels */
9+
trait Channel[T]:
10+
def read()(using Async): T
11+
def send(x: T)(using Async): Unit
12+
def close(): Unit
13+
14+
class ChannelClosedException extends Exception
15+
816
/** An unbounded asynchronous channel. Senders do not wait for matching
917
* readers.
1018
*/
11-
class UnboundedChannel[T] extends Async.OriginalSource[T]:
19+
class AsyncChannel[T] extends Async.OriginalSource[T], Channel[T]:
1220

1321
private val pending = ListBuffer[T]()
1422
private val waiting = mutable.Set[Listener[T]]()
23+
private var isClosed = false
24+
25+
private def ensureOpen() =
26+
if isClosed then throw ChannelClosedException()
1527

1628
private def drainWaiting(x: T): Boolean =
1729
waiting.iterator.find(_(x)) match
@@ -30,11 +42,13 @@ class UnboundedChannel[T] extends Async.OriginalSource[T]:
3042
def read()(using Async): T = synchronized:
3143
await(this)
3244

33-
def send(x: T): Unit = synchronized:
45+
def send(x: T)(using Async): Unit = synchronized:
46+
ensureOpen()
3447
val sent = pending.isEmpty && drainWaiting(x)
3548
if !sent then pending += x
3649

3750
def poll(k: Listener[T]): Boolean = synchronized:
51+
ensureOpen()
3852
drainPending(k)
3953

4054
def addListener(k: Listener[T]): Unit = synchronized:
@@ -43,7 +57,10 @@ class UnboundedChannel[T] extends Async.OriginalSource[T]:
4357
def dropListener(k: Listener[T]): Unit = synchronized:
4458
waiting -= k
4559

46-
end UnboundedChannel
60+
def close() =
61+
isClosed = true
62+
63+
end AsyncChannel
4764

4865
/** An unbuffered, synchronous channel. Senders and readers both block
4966
* until a communication between them happens. The channel provides two
@@ -52,7 +69,7 @@ end UnboundedChannel
5269
* waiting sender the data is transmitted directly. Otherwise we add
5370
* the operation to the corresponding pending set.
5471
*/
55-
trait SyncChannel[T]:
72+
trait SyncChannel[T] extends Channel[T]:
5673

5774
val canRead: Async.Source[T]
5875
val canSend: Async.Source[Listener[T]]
@@ -67,8 +84,13 @@ object SyncChannel:
6784

6885
private val pendingReads = mutable.Set[Listener[T]]()
6986
private val pendingSends = mutable.Set[Listener[Listener[T]]]()
87+
private var isClosed = false
88+
89+
private def ensureOpen() =
90+
if isClosed then throw ChannelClosedException()
7091

7192
private def link[T](pending: mutable.Set[T], op: T => Boolean): Boolean =
93+
ensureOpen()
7294
// Since sources are filterable, we have to match all pending readers or writers
7395
// against the incoming request
7496
pending.iterator.find(op) match
@@ -95,6 +117,9 @@ object SyncChannel:
95117
def dropListener(k: Listener[Listener[T]]): Unit = synchronized:
96118
pendingSends -= k
97119

120+
def close() =
121+
isClosed = true
122+
98123
end SyncChannel
99124

100125
def TestChannel(using ExecutionContext) =

0 commit comments

Comments
 (0)