@@ -7,16 +7,40 @@ import gears.async.Listener.withLock
7
7
import gears .async .Listener .NumberedLock
8
8
import scala .util .boundary
9
9
10
- /** A context that allows to suspend waiting for asynchronous data sources
10
+ /** The async context: provides the capability to asynchronously [[Async.await await ]] for [[Async.Source Source ]]s, and
11
+ * defines a scope for structured concurrency through a [[CompletionGroup ]].
12
+ *
13
+ * As both a context and a capability, the idiomatic way of using [[Async ]] is to be implicitly passed around
14
+ * functions, as an `using` parameter:
15
+ * {{{
16
+ * def function()(using Async): T = ???
17
+ * }}}
18
+ *
19
+ * It is not recommended to store [[Async ]] in a class field, since it complicates scoping rules.
20
+ *
21
+ * @param support
22
+ * An implementation of the underlying asynchronous operations (suspend and resume). See [[AsyncSupport ]].
23
+ * @param scheduler
24
+ * An implementation of a scheduler, for scheduling computation as they are spawned or resumed. See [[Scheduler ]].
25
+ *
26
+ * @see
27
+ * [[Async$.blocking Async.blocking ]] for a way to construct an [[Async ]] instance.
28
+ * @see
29
+ * [[Async$.group Async.group ]] and [[Future$.apply Future.apply ]] for [[Async ]]-subscoping operations.
11
30
*/
12
31
trait Async (using val support : AsyncSupport , val scheduler : support.Scheduler ):
13
- /** Wait for completion of async source `src` and return the result */
32
+ /** Waits for completion of source `src` and returns the result. Suspends the computation.
33
+ *
34
+ * @see
35
+ * [[Async.Source.awaitResult ]] and [[Async$.await ]] for extension methods calling [[Async!.await ]] from the source
36
+ * itself.
37
+ */
14
38
def await [T ](src : Async .Source [T ]): T
15
39
16
- /** The cancellation group for this Async */
40
+ /** Returns the cancellation group for this [[ Async ]] context. */
17
41
def group : CompletionGroup
18
42
19
- /** An Async of the same kind as this one, with a new cancellation group */
43
+ /** Returns an [[ Async ]] context of the same kind as this one, with a new cancellation group. */
20
44
def withGroup (group : CompletionGroup ): Async
21
45
22
46
object Async :
@@ -53,7 +77,7 @@ object Async:
53
77
def blocking [T ](body : Async .Spawn ?=> T )(using support : AsyncSupport , scheduler : support.Scheduler ): T =
54
78
group(body)(using Blocking (CompletionGroup .Unlinked ))
55
79
56
- /** The currently executing Async context */
80
+ /** Returns the currently executing Async context. Equivalent to `summon[Async]`. */
57
81
inline def current (using async : Async ): Async = async
58
82
59
83
/** [[Async.Spawn ]] is a special subtype of [[Async ]], also capable of spawning runnable [[Future ]]s.
@@ -63,8 +87,8 @@ object Async:
63
87
*/
64
88
opaque type Spawn <: Async = Async
65
89
66
- /** Runs [[ body ]] inside a spawnable context where it is allowed to spawning concurrently runnable [[Future ]]s. When
67
- * the body returns, all spawned futures are cancelled and waited for.
90
+ /** Runs ` body` inside a spawnable context where it is allowed to spawn concurrently runnable [[Future ]]s. When the
91
+ * body returns, all spawned futures are cancelled and waited for.
68
92
*/
69
93
def group [T ](body : Async .Spawn ?=> T )(using Async ): T =
70
94
withNewCompletionGroup(CompletionGroup ().link())(body)
@@ -86,51 +110,78 @@ object Async:
86
110
group.waitCompletion()(using completionAsync)
87
111
88
112
/** An asynchronous data source. Sources can be persistent or ephemeral. A persistent source will always pass same
89
- * data to calls of `poll and `onComplete`. An ephemeral source can pass new data in every call. An example of a
90
- * persistent source is `Future`. An example of an ephemeral source is `Channel`.
113
+ * data to calls of [[Source!.poll ]] and [[Source!.onComplete ]]. An ephemeral source can pass new data in every call.
114
+ *
115
+ * @see
116
+ * An example of a persistent source is [[gears.async.Future ]].
117
+ * @see
118
+ * An example of an ephemeral source is [[gears.async.Channel ]].
91
119
*/
92
120
trait Source [+ T ]:
93
-
94
- /** Check whether data is available at present and pass it to k if so. If no element is available, does not lock k
95
- * and returns false immediately. If there is (or may be) data available, the listener is locked and if it fails,
96
- * true is returned to signal this source's general availability. If locking k succeeds, only return true iff k's
97
- * complete is called. Calls to `poll` are always synchronous.
121
+ /** Checks whether data is available at present and pass it to `k` if so. Calls to `poll` are always synchronous and
122
+ * non-blocking.
123
+ *
124
+ * If no element is available, returns `false` immediately. If there is (or may be) data available, `k` is locked
125
+ * and if it fails, `true` is returned to signal this source's general availability. If locking `k` succeeds, only
126
+ * return `true` iff `k` is completed (it is always unlocked nevertheless).
127
+ *
128
+ * @return
129
+ * Whether poll was able to pass data to `k`. Note that this is regardless of `k` being available to receive the
130
+ * data. In most cases, one should pass `k` into [[Source!.onComplete ]] if `poll` returns `false`.
98
131
*/
99
132
def poll (k : Listener [T ]): Boolean
100
133
101
- /** Once data is available, pass it to function `k`. `k` returns true iff the data was consumed in an async block.
102
- * Calls to `onComplete` are usually asynchronous, meaning that the passed continuation `k` is a suspension.
134
+ /** Once data is available, pass it to the listener `k`. `onComplete` is always non-blocking.
135
+ *
136
+ * Note that `k`'s methods will be executed on the same thread as the [[Source ]], usually in sequence. It is hence
137
+ * important that the listener itself does not perform expensive operations.
103
138
*/
104
139
def onComplete (k : Listener [T ]): Unit
105
140
106
- /** Signal that listener `k` is dead (i.e. will always return `false` from now on). This permits original, (i.e.
107
- * non-derived) sources like futures or channels to drop the listener from their waiting sets.
141
+ /** Signal that listener `k` is dead (i.e. will always fail to acquire locks from now on), and should be removed
142
+ * from `onComplete` queues.
143
+ *
144
+ * This permits original, (i.e. non-derived) sources like futures or channels to drop the listener from their
145
+ * waiting sets.
108
146
*/
109
147
def dropListener (k : Listener [T ]): Unit
110
148
111
- /** Utility method for direct polling. */
149
+ /** Similar to [[Async.Source!.poll(k:Listener[T])* poll]], but instead of passing in a listener, directly return
150
+ * the value `T` if it is available.
151
+ */
112
152
def poll (): Option [T ] =
113
153
var resultOpt : Option [T ] = None
114
154
poll(Listener .acceptingListener { (x, _) => resultOpt = Some (x) })
115
155
resultOpt
116
156
117
- /** Utility method for direct waiting with `Async`. */
157
+ /** Waits for an item to arrive from the source. Suspends until an item returns.
158
+ *
159
+ * This is an utility method for direct waiting with `Async`, instead of going through listeners.
160
+ */
118
161
final def awaitResult (using ac : Async ) = ac.await(this )
119
162
end Source
120
163
121
164
extension [T ](src : Source [scala.util.Try [T ]])
122
- /** Waits for an item to arrive from the source, then automatically unwraps it. */
165
+ /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
166
+ * @see
167
+ * [[Source!.awaitResult awaitResult ]] for non-unwrapping await.
168
+ */
123
169
inline def await (using Async ) = src.awaitResult.get
124
170
extension [E , T ](src : Source [Either [E , T ]])
125
- /** Waits for an item to arrive from the source, then automatically unwraps it. */
171
+ /** Waits for an item to arrive from the source, then automatically unwraps it. Suspends until an item returns.
172
+ * @see
173
+ * [[Source!.awaitResult awaitResult ]] for non-unwrapping await.
174
+ */
126
175
inline def await (using Async ) = src.awaitResult.right.get
127
176
128
- /** An original source has a standard definition of `onComplete` in terms of `poll` and `addListener`. Implementations
129
- * should be the resource owner to handle listener queue and completion using an object monitor on the instance.
177
+ /** An original source has a standard definition of [[Source.onComplete onComplete ]] in terms of [[Source.poll poll ]]
178
+ * and [[OriginalSource.addListener addListener ]].
179
+ *
180
+ * Implementations should be the resource owner to handle listener queue and completion using an object monitor on
181
+ * the instance.
130
182
*/
131
183
abstract class OriginalSource [+ T ] extends Source [T ]:
132
-
133
- /** Add `k` to the listener set of this source */
184
+ /** Add `k` to the listener set of this source. */
134
185
protected def addListener (k : Listener [T ]): Unit
135
186
136
187
def onComplete (k : Listener [T ]): Unit = synchronized :
@@ -139,7 +190,12 @@ object Async:
139
190
end OriginalSource
140
191
141
192
object Source :
142
- /** Create a [[Source ]] containing the given values, resolved once for each. */
193
+ /** Create a [[Source ]] containing the given values, resolved once for each.
194
+ *
195
+ * @return
196
+ * an ephemeral source of values arriving to listeners in a queue. Once all values are received, attaching a
197
+ * listener with [[Source!.onComplete onComplete ]] will be a no-op (i.e. the listener will never be called).
198
+ */
143
199
def values [T ](values : T * ) =
144
200
import scala .collection .JavaConverters ._
145
201
val q = java.util.concurrent.ConcurrentLinkedQueue [T ]()
@@ -163,8 +219,14 @@ object Async:
163
219
164
220
extension [T ](src : Source [T ])
165
221
/** Create a new source that requires the original source to run the given transformation function on every value
166
- * received. Note that [[f ]] is **always** run on the computation that produces the values from the original
167
- * source, so this is very likely to run **sequentially** and be a performance bottleneck.
222
+ * received.
223
+ *
224
+ * Note that `f` is **always** run on the computation that produces the values from the original source, so this is
225
+ * very likely to run **sequentially** and be a performance bottleneck.
226
+ *
227
+ * @param f
228
+ * the transformation function to be run on every value. `f` is run *before* the item is passed to the
229
+ * [[Listener ]].
168
230
*/
169
231
def transformValuesWith [U ](f : T => U ) =
170
232
new Source [U ]:
@@ -182,7 +244,23 @@ object Async:
182
244
def dropListener (k : Listener [U ]): Unit =
183
245
src.dropListener(transform(k))
184
246
247
+ /** Creates a source that "races" a list of sources.
248
+ *
249
+ * Listeners attached to this source is resolved with the first item arriving from one of the sources. If multiple
250
+ * sources are available at the same time, one of the items will be returned with no priority. Items that are not
251
+ * returned are '''not''' consumed from the upstream sources.
252
+ *
253
+ * @see
254
+ * [[raceWithOrigin ]] for a race source that also returns the upstream origin of the item.
255
+ * @see
256
+ * [[Async$.select Async.select ]] for a convenient syntax to race sources and awaiting them with [[Async ]].
257
+ */
185
258
def race [T ](sources : Source [T ]* ): Source [T ] = raceImpl[T , T ]((v, _) => v)(sources* )
259
+
260
+ /** Like [[race ]], but the returned value includes a reference to the upstream source that the item came from.
261
+ * @see
262
+ * [[Async$.select Async.select ]] for a convenient syntax to race sources and awaiting them with [[Async ]].
263
+ */
186
264
def raceWithOrigin [T ](sources : Source [T ]* ): Source [(T , Source [T ])] =
187
265
raceImpl[(T , Source [T ]), T ]((v, src) => (v, src))(sources* )
188
266
@@ -260,28 +338,59 @@ object Async:
260
338
261
339
/** Cases for handling async sources in a [[select ]]. [[SelectCase ]] can be constructed by extension methods `handle`
262
340
* of [[Source ]].
341
+ *
342
+ * @see
343
+ * [[handle Source.handle ]] (and its operator alias [[~~> ~~> ]])
344
+ * @see
345
+ * [[Async$.select Async.select ]] where [[SelectCase ]] is used.
263
346
*/
264
347
opaque type SelectCase [T ] = (Source [? ], Nothing => T )
265
348
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe
266
349
267
350
extension [T ](src : Source [T ])
268
- /** Attach a handler to [[src ]], creating a [[SelectCase ]]. */
351
+ /** Attach a handler to `src`, creating a [[SelectCase ]].
352
+ * @see
353
+ * [[Async$.select Async.select ]] where [[SelectCase ]] is used.
354
+ */
269
355
inline def handle [U ](f : T => U ): SelectCase [U ] = (src, f)
270
356
271
- /** Alias for [[handle ]] */
357
+ /** Alias for [[handle ]]
358
+ * @see
359
+ * [[Async$.select Async.select ]] where [[SelectCase ]] is used.
360
+ */
272
361
inline def ~~> [U ](f : T => U ): SelectCase [U ] = src.handle(f)
273
362
274
363
/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race ]],
275
- * [[select ]] guarantees exactly one of the sources are polled. Unlike `map`ping a [[ Source ]], the handler in
364
+ * [[select ]] guarantees exactly one of the sources are polled. Unlike [[ transformValuesWith ]], the handler in
276
365
* [[select ]] is run in the same async context as the calling context of [[select ]].
366
+ *
367
+ * @see
368
+ * [[handle Source.handle ]] (and its operator alias [[~~> ~~> ]]) for methods to create [[SelectCase ]]s.
369
+ * @example
370
+ * {{{
371
+ * // Race a channel read with a timeout
372
+ * val ch = SyncChannel[Int]()
373
+ * // ...
374
+ * val timeout = Future(sleep(1500.millis))
375
+ *
376
+ * Async.select(
377
+ * ch.readSrc.handle: item =>
378
+ * Some(item * 2),
379
+ * timeout ~~> _ => None
380
+ * )
381
+ * }}}
277
382
*/
278
383
def select [T ](cases : SelectCase [T ]* )(using Async ) =
279
384
val (input, which) = raceWithOrigin(cases.map(_._1)* ).awaitResult
280
385
val (_, handler) = cases.find(_._1 == which).get
281
386
handler.asInstanceOf [input.type => T ](input)
282
387
283
- /** If left (respectively, right) source succeeds with `x`, pass `Left(x)`, (respectively, Right(x)) on to the
284
- * continuation.
388
+ /** Race two sources, wrapping them respectively in [[Left ]] and [[Right ]] cases.
389
+ * @return
390
+ * a new [[Source ]] that resolves with [[Left ]] if `src1` returns an item, [[Right ]] if `src2` returns an item,
391
+ * whichever comes first.
392
+ * @see
393
+ * [[race ]] and [[select ]] for racing more than two sources.
285
394
*/
286
395
def either [T1 , T2 ](src1 : Source [T1 ], src2 : Source [T2 ]): Source [Either [T1 , T2 ]] =
287
396
race(src1.transformValuesWith(Left (_)), src2.transformValuesWith(Right (_)))
0 commit comments