Skip to content

Commit d61b1c1

Browse files
committed
Timer and AsyncOperations
1 parent f9cdfb4 commit d61b1c1

File tree

8 files changed

+48
-27
lines changed

8 files changed

+48
-27
lines changed

jvm/src/main/scala/async/JvmAsyncOperations.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package gears.async
22

3+
import language.experimental.captureChecking
4+
35
object JvmAsyncOperations extends AsyncOperations:
46

57
private def jvmInterruptible[T](fn: => T)(using Async): T =

jvm/src/main/scala/async/VThreadSupport.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package gears.async
22

3+
import language.experimental.captureChecking
4+
35
import java.lang.invoke.{MethodHandles, VarHandle}
46
import java.util.concurrent.locks.ReentrantLock
57
import scala.annotation.unchecked.uncheckedVariance
@@ -11,13 +13,15 @@ object VThreadScheduler extends Scheduler:
1113
.name("gears.async.VThread-", 0L)
1214
.factory()
1315

14-
override def execute(body: Runnable): Unit =
16+
override def execute(body: Runnable^): Unit =
1517
val th = VTFactory.newThread(body)
1618
th.start()
1719

18-
override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = ScheduledRunnable(delay, body)
20+
override def schedule(delay: FiniteDuration, body: Runnable^): Cancellable =
21+
val sr = ScheduledRunnable(delay, body)
22+
() => sr.cancel()
1923

20-
private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable) extends Cancellable {
24+
private class ScheduledRunnable(val delay: FiniteDuration, val body: Runnable^) extends Cancellable {
2125
@volatile var interruptGuard = true // to avoid interrupting the body
2226

2327
val th = VTFactory.newThread: () =>

shared/src/main/scala/async/Async.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -372,21 +372,21 @@ object Async:
372372
* @see
373373
* [[Async$.select Async.select]] where [[SelectCase]] is used.
374374
*/
375-
opaque type SelectCase[T] = (Source[?], Nothing => T)
376-
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe
375+
case class SelectCase[+T] private[Async] (src: Source[Any]^, f: Nothing => T)
376+
// ^ unsafe types, but we only construct SelectCase from `handle` which is safe
377377

378378
extension [T](src: Source[T]^)
379379
/** Attach a handler to `src`, creating a [[SelectCase]].
380380
* @see
381381
* [[Async$.select Async.select]] where [[SelectCase]] is used.
382382
*/
383-
inline def handle[U](f: T => U): SelectCase[U]^{src, f} = (src, f)
383+
def handle[U](f: T => U): SelectCase[U]^{src, f} = SelectCase(src, f)
384384

385385
/** Alias for [[handle]]
386386
* @see
387387
* [[Async$.select Async.select]] where [[SelectCase]] is used.
388388
*/
389-
inline def ~~>[U](f: T => U): SelectCase[U]^{src, f} = src.handle(f)
389+
def ~~>[U](f: T => U): SelectCase[U]^{src, f} = src.handle(f)
390390

391391
/** Race a list of sources with the corresponding handler functions, once an item has come back. Like [[race]],
392392
* [[select]] guarantees exactly one of the sources are polled. Unlike [[transformValuesWith]], the handler in
@@ -410,7 +410,7 @@ object Async:
410410
*/
411411
def select[T](cases: (SelectCase[T]^)*)(using Async) =
412412
val (input, which) = raceWithOrigin(cases.map(_._1)*).awaitResult
413-
val (_, handler) = cases.find(_._1.symbol == which).get
413+
val SelectCase(_, handler) = cases.find(_._1.symbol == which).get
414414
handler.asInstanceOf[input.type => T](input)
415415

416416
/** Race two sources, wrapping them respectively in [[Left]] and [[Right]] cases.

shared/src/main/scala/async/AsyncOperations.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package gears.async
22

3-
import gears.async.AsyncOperations.sleep
3+
import language.experimental.captureChecking
44

55
import java.util.concurrent.TimeoutException
66
import scala.concurrent.duration.FiniteDuration
@@ -19,14 +19,14 @@ object AsyncOperations:
1919
* @param millis
2020
* The duration to suspend, in milliseconds. Must be a positive integer.
2121
*/
22-
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
22+
def sleep(millis: Long)(using AsyncOperations, Async): Unit =
2323
summon[AsyncOperations].sleep(millis)
2424

2525
/** Suspends the current [[Async]] context for `duration`.
2626
* @param duration
2727
* The duration to suspend. Must be positive.
2828
*/
29-
inline def sleep(duration: FiniteDuration)(using AsyncOperations, Async): Unit =
29+
def sleep(duration: FiniteDuration)(using AsyncOperations, Async): Unit =
3030
sleep(duration.toMillis)
3131

3232
/** Runs [[op]] with a timeout. When the timeout occurs, [[op]] is cancelled through the given [[Async]] context, and
@@ -36,7 +36,7 @@ def withTimeout[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOperatio
3636
Async.group:
3737
Async.select(
3838
Future(op).handle(_.get),
39-
Future(sleep(timeout)).handle: _ =>
39+
Future(AsyncOperations.sleep(timeout)).handle: _ =>
4040
throw TimeoutException()
4141
)
4242

@@ -47,5 +47,5 @@ def withTimeoutOption[T](timeout: FiniteDuration)(op: Async ?=> T)(using AsyncOp
4747
Async.group:
4848
Async.select(
4949
Future(op).handle(v => Some(v.get)),
50-
Future(sleep(timeout)).handle(_ => None)
50+
Future(AsyncOperations.sleep(timeout)).handle(_ => None)
5151
)

shared/src/main/scala/async/AsyncSupport.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package gears.async
22

33
import language.experimental.captureChecking
4+
45
import scala.concurrent.duration._
6+
import scala.annotation.capability
57

68
/** The delimited continuation suspension interface. Represents a suspended computation asking for a value of type `T`
79
* to continue (and eventually returning a value of type `R`).

shared/src/main/scala/async/Timer.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package gears.async
22

3+
import language.experimental.captureChecking
4+
35
import gears.async.Listener
46

57
import java.util.concurrent.CancellationException
@@ -9,10 +11,12 @@ import scala.collection.mutable
911
import scala.concurrent.TimeoutException
1012
import scala.concurrent.duration._
1113
import scala.util.{Failure, Success, Try}
14+
import scala.annotation.unchecked.uncheckedCaptures
1215

1316
import AsyncOperations.sleep
1417
import Future.Promise
1518

19+
1620
/** Timer exposes a steady [[Async.Source]] of ticks that happens every `tickDuration` milliseconds. Note that the timer
1721
* does not start ticking until `start` is called (which is a blocking operation, until the timer is cancelled).
1822
*
@@ -27,23 +31,32 @@ class Timer(tickDuration: Duration) extends Cancellable {
2731
private var isCancelled = false
2832

2933
private object Source extends Async.OriginalSource[this.TimerEvent] {
30-
val listeners = mutable.Set[Listener[TimerEvent]]()
31-
def tick() = synchronized {
34+
private val listeners : mutable.Set[(Listener[TimerEvent]^) @uncheckedCaptures] =
35+
mutable.Set[(Listener[TimerEvent]^) @uncheckedCaptures]()
36+
37+
def tick(): Unit = synchronized {
3238
listeners.filterInPlace(l =>
33-
l.completeNow(TimerEvent.Tick, this)
39+
l.completeNow(TimerEvent.Tick, src)
3440
false
3541
)
3642
}
37-
override def poll(k: Listener[TimerEvent]): Boolean =
43+
override def poll(k: Listener[TimerEvent]^): Boolean =
3844
if isCancelled then k.completeNow(TimerEvent.Cancelled, this)
3945
else false // subscribing to a timer always takes you to the next tick
40-
override def dropListener(k: Listener[TimerEvent]): Unit = listeners -= k
41-
override protected def addListener(k: Listener[TimerEvent]): Unit =
46+
override def dropListener(k: Listener[TimerEvent]^): Unit = listeners -= k
47+
override protected def addListener(k: Listener[TimerEvent]^): Unit =
4248
if isCancelled then k.completeNow(TimerEvent.Cancelled, this)
4349
else
4450
Timer.this.synchronized:
4551
if isCancelled then k.completeNow(TimerEvent.Cancelled, this)
4652
else listeners += k
53+
54+
def cancel(): Unit =
55+
synchronized { isCancelled = true }
56+
src.synchronized {
57+
Source.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src))
58+
Source.listeners.clear()
59+
}
4760
}
4861

4962
/** Ticks of the timer are delivered through this source. Note that ticks are ephemeral. */
@@ -62,10 +75,6 @@ class Timer(tickDuration: Duration) extends Cancellable {
6275
Source.tick()
6376
loop()
6477

65-
override def cancel(): Unit =
66-
synchronized { isCancelled = true }
67-
src.synchronized {
68-
Source.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src))
69-
Source.listeners.clear()
70-
}
78+
override def cancel(): Unit = Source.cancel()
7179
}
80+

shared/src/main/scala/async/futures.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,11 @@ object Future:
108108
end CoreFuture
109109

110110
private class CancelSuspension[U](val src: Async.Source[U]^)(val ac: Async, val suspension: ac.support.Suspension[Try[U], Unit]) extends Cancellable:
111+
self: CancelSuspension[U]^{src, ac} =>
111112
var listener: Listener[U]^{ac} = Listener.acceptingListener[U]: (x, _) =>
112113
val completedBefore = complete()
113-
if !completedBefore then ac.support.resumeAsync(suspension)(Success(x))
114+
if !completedBefore then
115+
ac.support.resumeAsync(suspension)(Success(x))
114116
unlink()
115117
var completed = false
116118

shared/src/test/scala/TimerBehavior.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import language.experimental.captureChecking
2+
13
import gears.async.AsyncOperations._
24
import gears.async._
35

@@ -25,7 +27,7 @@ class TimerBehavior extends munit.FunSuite {
2527
assert(timer.src.awaitResult == timer.TimerEvent.Tick)
2628
}
2729

28-
def `cancel future after timeout`[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Try[T] =
30+
def `cancel future after timeout`[T](d: Duration, f: Future[T]^)(using Async, AsyncOperations): Try[T] =
2931
Async.group:
3032
f.link()
3133
val t = Future { sleep(d.toMillis) }

0 commit comments

Comments
 (0)