1
1
package gears .async
2
2
3
- import TaskSchedule .ExponentialBackoff
4
- import AsyncOperations .sleep
5
-
6
3
import scala .collection .mutable
7
- import mutable .ListBuffer
8
-
9
4
import java .util .concurrent .atomic .AtomicBoolean
10
5
import java .util .concurrent .CancellationException
11
6
import scala .compiletime .uninitialized
12
- import scala .util .{Failure , Success , Try }
13
7
import scala .annotation .unchecked .uncheckedVariance
14
8
import scala .annotation .tailrec
15
9
import scala .util
10
+ import scala .util .{Failure , Success , Try }
16
11
import scala .util .control .NonFatal
17
12
18
- /** A cancellable future that can suspend waiting for other asynchronous sources
13
+ /** Futures are [[Async.Source Source ]]s that has the following properties:
14
+ * - They represent a single value: Once resolved, [[Async.await await ]]-ing on a [[Future ]] should always return the
15
+ * same value.
16
+ * - They can potentially be cancelled, via [[Cancellable.cancel the cancel method ]].
17
+ *
18
+ * There are two kinds of futures, active and passive.
19
+ * - '''Active''' futures are ones that are spawned with [[Future.apply ]] and [[Task.start ]]. They require the
20
+ * [[Async.Spawn ]] context, and run on their own (as long as the [[Async.Spawn ]] scope has not ended). Active
21
+ * futures represent concurrent computations within Gear's structured concurrency tree. Idiomatic Gears code should
22
+ * ''never'' return active futures. Should a function be async (i.e. takes an [[Async ]] context parameter), they
23
+ * should return values or throw exceptions directly.
24
+ * - '''Passive''' futures are ones that are created by [[Future.Promise ]] (through
25
+ * [[Future.Promise.asFuture asFuture ]]) and [[Future.withResolver ]]. They represent yet-arrived values coming from
26
+ * ''outside'' of Gear's structured concurrency tree (for example, from network or the file system, or even from
27
+ * another concurrency system like [[scala.concurrent.Future Scala standard library futures ]]). Idiomatic Gears
28
+ * libraries should return this kind of [[Future ]] if deemed neccessary, but functions returning passive futures
29
+ * should ''not'' take an [[Async ]] context.
19
30
*/
20
31
trait Future [+ T ] extends Async .OriginalSource [Try [T ]], Cancellable
21
32
22
33
object Future :
23
-
24
34
/** A future that is completed explicitly by calling its `complete` method. There are three public implementations
25
35
*
26
36
* - RunnableFuture: Completion is done by running a block of code
@@ -89,6 +99,11 @@ object Future:
89
99
*/
90
100
private class RunnableFuture [+ T ](body : Async .Spawn ?=> T )(using ac : Async ) extends CoreFuture [T ]:
91
101
102
+ /** RunnableFuture maintains its own inner [[CompletionGroup ]], that is separated from the provided Async
103
+ * instance's. When the future is cancelled, we only cancel this CompletionGroup. This effectively means any
104
+ * `.await` operations within the future is cancelled *only if they link into this group*. The future body run with
105
+ * this inner group by default, but it can always opt-out (e.g. with [[uninterruptible ]]).
106
+ */
92
107
private var innerGroup : CompletionGroup = CompletionGroup ()
93
108
94
109
private def checkCancellation (): Unit =
@@ -150,13 +165,13 @@ object Future:
150
165
151
166
end RunnableFuture
152
167
153
- /** Create a future that asynchronously executes [[ body ]] that defines its result value in a [[Try ]] or returns
154
- * [[ Failure ]] if an exception was thrown .
168
+ /** Create a future that asynchronously executes ` body` that wraps its execution in a [[scala.util. Try ]]. The returned
169
+ * future is linked to the given [[ Async.Spawn ]] scope by default, i.e. it is cancelled when this scope ends .
155
170
*/
156
171
def apply [T ](body : Async .Spawn ?=> T )(using async : Async , spawnable : Async .Spawn & async.type ): Future [T ] =
157
172
RunnableFuture (body)
158
173
159
- /** A future that immediately terminates with the given result. */
174
+ /** A future that is immediately completed with the given result. */
160
175
def now [T ](result : Try [T ]): Future [T ] =
161
176
val f = CoreFuture [T ]()
162
177
f.complete(result)
@@ -172,7 +187,6 @@ object Future:
172
187
inline def rejected (exception : Throwable ): Future [Nothing ] = now(Failure (exception))
173
188
174
189
extension [T ](f1 : Future [T ])
175
-
176
190
/** Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise,
177
191
* fail with the failure that was returned first.
178
192
*/
@@ -190,23 +204,24 @@ object Future:
190
204
case Right (Failure (ex)) => r.reject(ex)
191
205
})
192
206
193
- /** Parallel composition of tuples of futures. Future.Success(EmptyTuple) might be treated as Nil.
194
- */
195
- def *: [U <: Tuple ](f2 : Future [U ]): Future [T *: U ] = Future .withResolver: r =>
196
- Async
197
- .either(f1, f2)
198
- .onComplete(Listener { (v, _) =>
199
- v match
200
- case Left (Success (x1)) =>
201
- f2.onComplete(Listener { (x2, _) => r.complete(x2.map(x1 *: _)) })
202
- case Right (Success (x2)) =>
203
- f1.onComplete(Listener { (x1, _) => r.complete(x1.map(_ *: x2)) })
204
- case Left (Failure (ex)) => r.reject(ex)
205
- case Right (Failure (ex)) => r.reject(ex)
206
- })
207
+ // /** Parallel composition of tuples of futures. Disabled since scaladoc is crashing with it. (https://github.com/scala/scala3/issues/19925) */
208
+ // def *:[U <: Tuple](f2: Future[U]): Future[T *: U] = Future.withResolver: r =>
209
+ // Async
210
+ // .either(f1, f2)
211
+ // .onComplete(Listener { (v, _) =>
212
+ // v match
213
+ // case Left(Success(x1)) =>
214
+ // f2.onComplete(Listener { (x2, _) => r.complete(x2.map(x1 *: _)) })
215
+ // case Right(Success(x2)) =>
216
+ // f1.onComplete(Listener { (x1, _) => r.complete(x1.map(_ *: x2)) })
217
+ // case Left(Failure(ex)) => r.reject(ex)
218
+ // case Right(Failure(ex)) => r.reject(ex)
219
+ // })
207
220
208
221
/** Alternative parallel composition of this task with `other` task. If either task succeeds, succeed with the
209
222
* success that was returned first. Otherwise, fail with the failure that was returned last.
223
+ * @see
224
+ * [[orWithCancel ]] for an alternative version where the slower future is cancelled.
210
225
*/
211
226
def or (f2 : Future [T ]): Future [T ] = orImpl(false )(f2)
212
227
@@ -229,7 +244,11 @@ object Future:
229
244
230
245
end extension
231
246
232
- /** A promise defines a future that is be completed via the `complete` method.
247
+ /** A promise is a [[Future ]] that is be completed manually via the `complete` method.
248
+ * @see
249
+ * [[Promise$.apply ]] to create a new, empty promise.
250
+ * @see
251
+ * [[Future.withResolver ]] to create a passive [[Future ]] from callback-style asynchronous calls.
233
252
*/
234
253
trait Promise [T ] extends Future [T ]:
235
254
inline def asFuture : Future [T ] = this
@@ -238,6 +257,7 @@ object Future:
238
257
def complete (result : Try [T ]): Unit
239
258
240
259
object Promise :
260
+ /** Create a new, unresolved [[Promise ]]. */
241
261
def apply [T ](): Promise [T ] =
242
262
new CoreFuture [T ] with Promise [T ]:
243
263
override def cancel (): Unit =
@@ -389,7 +409,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
389
409
if (maxRepetitions == 1 ) ret
390
410
else {
391
411
while (maxRepetitions == 0 || repetitions < maxRepetitions) {
392
- sleep(millis)
412
+ AsyncOperations . sleep(millis)
393
413
ret = body
394
414
repetitions += 1
395
415
}
@@ -408,7 +428,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
408
428
else {
409
429
var timeToSleep = millis
410
430
while (maxRepetitions == 0 || repetitions < maxRepetitions) {
411
- sleep(timeToSleep)
431
+ AsyncOperations . sleep(timeToSleep)
412
432
timeToSleep *= exponentialBase
413
433
ret = body
414
434
repetitions += 1
@@ -427,7 +447,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
427
447
repetitions += 1
428
448
if (maxRepetitions == 1 ) ret
429
449
else {
430
- sleep(millis)
450
+ AsyncOperations . sleep(millis)
431
451
ret = body
432
452
repetitions += 1
433
453
if (maxRepetitions == 2 ) ret
@@ -436,7 +456,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
436
456
val aOld = a
437
457
a = b
438
458
b = aOld + b
439
- sleep(b * millis)
459
+ AsyncOperations . sleep(b * millis)
440
460
ret = body
441
461
repetitions += 1
442
462
}
@@ -451,7 +471,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
451
471
@ tailrec
452
472
def helper (repetitions : Long = 0 ): T =
453
473
if (repetitions > 0 && millis > 0 )
454
- sleep(millis)
474
+ AsyncOperations . sleep(millis)
455
475
val ret : T = body
456
476
ret match {
457
477
case Failure (_) => ret
@@ -467,7 +487,7 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
467
487
@ tailrec
468
488
def helper (repetitions : Long = 0 ): T =
469
489
if (repetitions > 0 && millis > 0 )
470
- sleep(millis)
490
+ AsyncOperations . sleep(millis)
471
491
val ret : T = body
472
492
ret match {
473
493
case Success (_) => ret
@@ -480,6 +500,11 @@ class Task[+T](val body: (Async, AsyncOperations) ?=> T):
480
500
481
501
end Task
482
502
503
+ /** Runs the `body` inside in an [[Async ]] context that does *not* propagate cancellation until the end.
504
+ *
505
+ * In other words, `body` is never notified of the cancellation of the `ac` context; but `uninterruptible` would still
506
+ * throw a [[CancellationException ]] ''after `body` finishes running'' if `ac` was cancelled.
507
+ */
483
508
def uninterruptible [T ](body : Async ?=> T )(using ac : Async ): T =
484
509
val tracker = Cancellable .Tracking ().link()
485
510
@@ -492,7 +517,12 @@ def uninterruptible[T](body: Async ?=> T)(using ac: Async): T =
492
517
if tracker.isCancelled then throw new CancellationException ()
493
518
r
494
519
495
- def cancellationScope [T ](cancel : Cancellable )(fn : => T )(using a : Async ): T =
496
- cancel.link()
520
+ /** Link `cancellable` to the completion group of the current [[Async ]] context during `fn`.
521
+ *
522
+ * If the [[Async ]] context is cancelled during the execution of `fn`, `cancellable` will also be immediately
523
+ * cancelled.
524
+ */
525
+ def cancellationScope [T ](cancellable : Cancellable )(fn : => T )(using a : Async ): T =
526
+ cancellable.link()
497
527
try fn
498
- finally cancel .unlink()
528
+ finally cancellable .unlink()
0 commit comments