@@ -14,16 +14,26 @@ import Channel.{Closed, Res}
14
14
/** The part of a channel one can send values to. Blocking behavior depends on the implementation.
15
15
*/
16
16
trait SendableChannel [- T ]:
17
- /** Create an [[Async.Source ]] representing the send action of value [[x ]]. Note that *each* listener attached to and
18
- * accepting a [[Sent ]] value corresponds to [[x ]] being sent once.
17
+ /** Create an [[Async.Source ]] representing the send action of value `x`.
18
+ *
19
+ * Note that *each* listener attached to and accepting an [[Unit ]] value corresponds to `x` being sent once.
19
20
*
20
21
* To create an [[Async.Source ]] that sends the item exactly once regardless of listeners attached, wrap the [[send ]]
21
- * operation inside a [[gears.async.Future ]].
22
+ * operation inside a [[gears.async.Future ]]:
23
+ * {{{
24
+ * val sendOnce = Future(ch.send(x))
25
+ * }}}
26
+ *
27
+ * @return
28
+ * an [[Async.Source ]] that resolves with `Right(())` when `x` is sent to the channel, or `Left(Closed)` if the
29
+ * channel is already closed. This source will perform a send operation every time a listener is attached to it, or
30
+ * every time it is [[Async$.await ]]ed on.
22
31
*/
23
32
def sendSource (x : T ): Async .Source [Res [Unit ]]
24
33
25
- /** Send [[x ]] over the channel, blocking (asynchronously with [[Async ]]) until the item has been sent or, if the
26
- * channel is buffered, queued. Throws [[ChannelClosedException ]] if the channel was closed.
34
+ /** Send `x` over the channel, suspending until the item has been sent or, if the channel is buffered, queued.
35
+ * @throws ChannelClosedException
36
+ * if the channel was closed.
27
37
*/
28
38
def send (x : T )(using Async ): Unit = sendSource(x).awaitResult match
29
39
case Right (_) => ()
@@ -34,55 +44,98 @@ end SendableChannel
34
44
*/
35
45
trait ReadableChannel [+ T ]:
36
46
/** An [[Async.Source ]] corresponding to items being sent over the channel. Note that *each* listener attached to and
37
- * accepting a [[Read ]] value corresponds to one value received over the channel.
47
+ * accepting a [[Right ]] value corresponds to one value received over the channel.
38
48
*
39
49
* To create an [[Async.Source ]] that reads *exactly one* item regardless of listeners attached, wrap the [[read ]]
40
50
* operation inside a [[gears.async.Future ]].
51
+ * {{{
52
+ * val readOnce = Future(ch.read(x))
53
+ * }}}
41
54
*/
42
55
val readSource : Async .Source [Res [T ]]
43
56
44
- /** Read an item from the channel, blocking (asynchronously with [[ Async ]]) until the item has been received. Returns
57
+ /** Read an item from the channel, suspending until the item has been received. Returns
45
58
* `Failure(ChannelClosedException)` if the channel was closed.
46
59
*/
47
60
def read ()(using Async ): Res [T ] = readSource.awaitResult
48
61
end ReadableChannel
49
62
50
- /** A generic channel that can be sent to, received from and closed. */
63
+ /** A generic channel that can be sent to, received from and closed.
64
+ * @example
65
+ * {{{
66
+ * // send from one Future, read from multiple
67
+ * val ch = SyncChannel[Int]()
68
+ * val sender = Future:
69
+ * for i <- 1 to 20 do
70
+ * ch.send(i)
71
+ * ch.close()
72
+ * val receivers = (1 to 5).map: n =>
73
+ * Future:
74
+ * boundary:
75
+ * while true:
76
+ * ch.read() match
77
+ * case Right(k) => println(s"Receiver $n got: $k")
78
+ * case Left(_) => boundary.break()
79
+ *
80
+ * receivers.awaitAll
81
+ * }}}
82
+ * @see
83
+ * [[SyncChannel ]], [[BufferedChannel ]] and [[UnboundedChannel ]] for channel implementations.
84
+ */
51
85
trait Channel [T ] extends SendableChannel [T ], ReadableChannel [T ], java.io.Closeable :
86
+ /** Restrict this channel to send-only. */
52
87
inline final def asSendable : SendableChannel [T ] = this
88
+
89
+ /** Restrict this channel to read-only. */
53
90
inline final def asReadable : ReadableChannel [T ] = this
91
+
92
+ /** Restrict this channel to close-only. */
54
93
inline final def asCloseable : java.io.Closeable = this
55
94
56
95
protected type Reader = Listener [Res [T ]]
57
96
protected type Sender = Listener [Res [Unit ]]
58
97
end Channel
59
98
60
- /** SyncChannel, sometimes called a rendez-vous channel has the following semantics:
61
- * - `send` to an unclosed channel blocks until a reader commits to receiving the value (via successfully locking).
99
+ /** Synchronous channels, sometimes called rendez-vous channels, has the following semantics:
100
+ * - [[Channel.send send ]] to an unclosed channel blocks until a [[Channel.read read ]] listener commits to receiving
101
+ * the value (via successfully locking).
102
+ *
103
+ * See [[SyncChannel$.apply ]] for creation of synchronous channels.
62
104
*/
63
105
trait SyncChannel [T ] extends Channel [T ]
64
106
65
- /** BufferedChannel(size: Int) is a version of a channel with an internal value buffer (represented internally as an
66
- * array with positive size). It has the following semantics:
67
- * - `send` if the buffer is not full appends the value to the buffer and success immediately.
68
- * - `send` if the buffer is full blocks until some buffer slot is freed and assigned to this sender.
107
+ /** Buffered channels are channels with an internal value buffer (represented internally as an array with positive
108
+ * size). They have the following semantics:
109
+ * - [[Channel.send send ]], when the buffer is not full, appends the value to the buffer and success immediately.
110
+ * - [[Channel.send send ]], when the buffer is full, suspends until some buffer slot is freed and assigned to this
111
+ * sender.
112
+ *
113
+ * See [[BufferedChannel$.apply ]] for creation of buffered channels.
69
114
*/
70
115
trait BufferedChannel [T ] extends Channel [T ]
71
116
72
- /** UnboundedChannel are buffered channels that do not bound the number of items in the channel. In other words, the
73
- * buffer is treated as never being full and will expand as needed.
117
+ /** Unbounded channels are buffered channels that do not have an upper bound on the number of items in the channel. In
118
+ * other words, the buffer is treated as never being full and will expand as needed.
119
+ *
120
+ * See [[UnboundedChannel$.apply ]] for creation of unbounded channels.
74
121
*/
75
122
trait UnboundedChannel [T ] extends BufferedChannel [T ]:
76
- /** Send the item immediately. Throws [[ChannelClosedException ]] if the channel is closed. */
123
+ /** Sends the item immediately.
124
+ *
125
+ * @throws ChannelClosedException
126
+ * if the channel is closed.
127
+ */
77
128
def sendImmediately (x : T ): Unit
78
129
79
- /** This exception is being raised by [[Channel.send ]] on closed [[Channel ]], it is also returned wrapped in `Failure`
80
- * when reading form a closed channel. [[ChannelMultiplexer ]] sends `Failure(ChannelClosedException)` to all
81
- * subscribers when it receives a `close()` signal.
130
+ /** The exception raised by [[Channel.send send ]] (or [[UnboundedChannel.sendImmediately ]]) on a closed [[Channel ]].
131
+ *
132
+ * It is also returned wrapped in `Failure` when reading form a closed channel. [[ChannelMultiplexer ]] sends
133
+ * `Failure(ChannelClosedException)` to all subscribers when it receives a `close()` signal.
82
134
*/
83
135
class ChannelClosedException extends Exception
84
136
85
137
object SyncChannel :
138
+ /** Creates a new [[SyncChannel ]]. */
86
139
def apply [T ](): SyncChannel [T ] = Impl ()
87
140
88
141
private class Impl [T ] extends Channel .Impl [T ] with SyncChannel [T ]:
@@ -99,6 +152,7 @@ end SyncChannel
99
152
object BufferedChannel :
100
153
/** Create a new buffered channel with the given buffer size. */
101
154
def apply [T ](size : Int = 10 ): BufferedChannel [T ] = Impl (size)
155
+
102
156
private class Impl [T ](size : Int ) extends Channel .Impl [T ] with BufferedChannel [T ]:
103
157
require(size > 0 , " Buffered channels must have a buffer size greater than 0" )
104
158
val buf = new mutable.Queue [T ](size)
@@ -131,6 +185,7 @@ object BufferedChannel:
131
185
end BufferedChannel
132
186
133
187
object UnboundedChannel :
188
+ /** Creates a new [[UnboundedChannel ]]. */
134
189
def apply [T ](): UnboundedChannel [T ] = Impl [T ]()
135
190
136
191
private final class Impl [T ]() extends Channel .Impl [T ] with UnboundedChannel [T ] {
@@ -309,11 +364,11 @@ end Channel
309
364
* all messages sent by the publishers. The only guarantee on the order of the values the subscribers see is that
310
365
* values from the same publisher will arrive in order.
311
366
*
312
- * Channel multiplexer can also be closed, in that case all subscribers will receive Failure(ChannelClosedException)
367
+ * Channel multiplexer can also be closed, in that case all subscribers will receive ` Failure(ChannelClosedException)`
313
368
* but no attempt at closing either publishers or subscribers will be made.
314
369
*/
315
370
trait ChannelMultiplexer [T ] extends java.io.Closeable :
316
- /** Run the multiplexer synchronously. This call only returns after this multiplexer has been cancelled. */
371
+ /** Run the multiplexer. Returns after this multiplexer has been cancelled. */
317
372
def run ()(using Async ): Unit
318
373
319
374
def addPublisher (c : ReadableChannel [T ]): Unit
0 commit comments