Skip to content

Commit e865dbd

Browse files
fwbrasilhearnadammatteobilardiakhilender-bongirwar
authored
[core] introduce reactive Signal (#1082)
This new effect is in honor of the 600th person to start the repository. Congrats and thank you @diegobernardes!! 🎉 ### Problem Kyo doesn't offer a solution to handle reactive signals, which is a common need in UI solutions for example. ### Solution Introduce `Signal` to `kyo-core` with mutable and constant versions and methods to stream the state. ### Notes - This is inspired by fs2's `Signal`/`SignallingRef` but the implementation is more optimized with two separate `AtomicRef`s to manage the state and listeners, which reduces allocations. - Please see the scaladocs for an explanation of the functionality and implementation. - I had to add a method to allow masking unsafe promises. - I've also changed the atomic classes to provide `updateAndGet` + `getAndUpdate` for consistency. --------- Signed-off-by: Akhilender Bongirwar <[email protected]> Co-authored-by: Adam Hearn <[email protected]> Co-authored-by: Matteo Bilardi <[email protected]> Co-authored-by: Akhilender Bongirwar <[email protected]>
1 parent 9d1b4d7 commit e865dbd

File tree

10 files changed

+903
-19
lines changed

10 files changed

+903
-19
lines changed
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package kyo.internal
2+
3+
private[internal] class OsSignalPlatformSpecific:
4+
val handle: OsSignal.Handler = OsSignal.Handler.Noop

kyo-core/js-native/src/main/scala/kyo/internal/SignalPlatformSpecific.scala

Lines changed: 0 additions & 4 deletions
This file was deleted.

kyo-core/jvm/src/main/scala/kyo/internal/SignalPlatformSpecific.scala renamed to kyo-core/jvm/src/main/scala/kyo/internal/OSSignalPlatformSpecific.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package kyo.internal
22

3-
import Signal.Handler
3+
import OsSignal.Handler
44
import java.lang.invoke.MethodHandle
55
import java.lang.invoke.MethodHandles
66
import java.lang.invoke.MethodType
@@ -13,7 +13,7 @@ import kyo.Result
1313
/** This class provides a jvm-specific implementation of signal handling. It uses reflection to install signal handlers as they may not be
1414
* available on all implementations of the JVM.
1515
*/
16-
private[internal] class SignalPlatformSpecific:
16+
private[internal] class OsSignalPlatformSpecific:
1717
private val logger = Logger.getLogger("kyo.internal.Signal")
1818
val handle: Handler = {
1919
for
@@ -47,7 +47,7 @@ private[internal] class SignalPlatformSpecific:
4747
this
4848

4949
val proxy = Proxy.newProxyInstance(
50-
Signal.getClass.getClassLoader,
50+
OsSignal.getClass.getClassLoader,
5151
Array(signalHandlerClass),
5252
invocationHandler
5353
)
@@ -89,4 +89,4 @@ private[internal] class SignalPlatformSpecific:
8989
)
9090
}
9191
end initHandleStaticMethodHandle
92-
end SignalPlatformSpecific
92+
end OsSignalPlatformSpecific

kyo-core/jvm/src/test/scala/kyo/internal/SignalTest.scala renamed to kyo-core/jvm/src/test/scala/kyo/internal/OSSignalTest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import java.util.concurrent.CountDownLatch
44
import java.util.concurrent.TimeUnit
55
import kyo.Test
66

7-
class SignalTest extends Test:
7+
class OsSignalTest extends Test:
88

99
"handles on signal" in {
1010
val wasHandled = new CountDownLatch(1)
1111

12-
Signal.handle("USR2", wasHandled.countDown())
12+
OsSignal.handle("USR2", wasHandled.countDown())
1313

1414
val signal = new sun.misc.Signal("USR2")
1515
sun.misc.Signal.raise(signal)
@@ -19,7 +19,7 @@ class SignalTest extends Test:
1919

2020
"lazy" in {
2121
var wasHandled = false
22-
Signal.handle("USR2", { wasHandled = true })
22+
OsSignal.handle("USR2", { wasHandled = true })
2323
assert(!wasHandled)
2424
}
25-
end SignalTest
25+
end OsSignalTest

kyo-core/shared/src/main/scala/kyo/Atomic.scala

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,22 @@ final case class AtomicInt private (unsafe: AtomicInt.Unsafe):
9191
*/
9292
inline def addAndGet(v: Int)(using inline frame: Frame): Int < IO = IO.Unsafe(unsafe.addAndGet(v))
9393

94+
/** Atomically updates the current value using the given function and returns the old value.
95+
* @param f
96+
* The function to apply to the current value
97+
* @return
98+
* The previous value
99+
*/
100+
inline def getAndUpdate(inline f: Int => Int)(using inline frame: Frame): Int < IO = IO.Unsafe(unsafe.getAndUpdate(f))
101+
102+
/** Atomically updates the current value using the given function and returns the updated value.
103+
* @param f
104+
* The function to apply to the current value
105+
* @return
106+
* The updated value
107+
*/
108+
inline def updateAndGet(inline f: Int => Int)(using inline frame: Frame): Int < IO = IO.Unsafe(unsafe.updateAndGet(f))
109+
94110
/** Returns a string representation of the current value.
95111
* @return
96112
* A string representation of the atomic integer
@@ -158,6 +174,8 @@ object AtomicInt:
158174
inline def getAndDecrement()(using inline allow: AllowUnsafe): Int = self.getAndDecrement()
159175
inline def getAndAdd(v: Int)(using inline allow: AllowUnsafe): Int = self.getAndAdd(v)
160176
inline def addAndGet(v: Int)(using inline allow: AllowUnsafe): Int = self.addAndGet(v)
177+
inline def getAndUpdate(inline f: Int => Int)(using inline allow: AllowUnsafe): Int = self.getAndUpdate(f(_))
178+
inline def updateAndGet(inline f: Int => Int)(using inline allow: AllowUnsafe): Int = self.updateAndGet(f(_))
161179
inline def safe: AtomicInt = AtomicInt(self)
162180
end extension
163181
end Unsafe
@@ -252,6 +270,22 @@ final case class AtomicLong private (unsafe: AtomicLong.Unsafe):
252270
*/
253271
inline def addAndGet(v: Long)(using inline frame: Frame): Long < IO = IO.Unsafe(unsafe.addAndGet(v))
254272

273+
/** Atomically updates the current value using the given function and returns the old value.
274+
* @param f
275+
* The function to apply to the current value
276+
* @return
277+
* The previous value
278+
*/
279+
inline def getAndUpdate(inline f: Long => Long)(using inline frame: Frame): Long < IO = IO.Unsafe(unsafe.getAndUpdate(f(_)))
280+
281+
/** Atomically updates the current value using the given function and returns the updated value.
282+
* @param f
283+
* The function to apply to the current value
284+
* @return
285+
* The updated value
286+
*/
287+
inline def updateAndGet(inline f: Long => Long)(using inline frame: Frame): Long < IO = IO.Unsafe(unsafe.updateAndGet(f(_)))
288+
255289
/** Returns a string representation of the current value.
256290
* @return
257291
* A string representation of the atomic long
@@ -320,6 +354,8 @@ object AtomicLong:
320354
inline def getAndDecrement()(using inline allow: AllowUnsafe): Long = self.getAndDecrement()
321355
inline def getAndAdd(v: Long)(using inline allow: AllowUnsafe): Long = self.getAndAdd(v)
322356
inline def addAndGet(v: Long)(using inline allow: AllowUnsafe): Long = self.addAndGet(v)
357+
inline def getAndUpdate(inline f: Long => Long)(using inline allow: AllowUnsafe): Long = self.getAndUpdate(f(_))
358+
inline def updateAndGet(inline f: Long => Long)(using inline allow: AllowUnsafe): Long = self.updateAndGet(f(_))
323359
inline def safe: AtomicLong = AtomicLong(self)
324360
end extension
325361
end Unsafe
@@ -494,19 +530,21 @@ final case class AtomicRef[A] private (unsafe: AtomicRef.Unsafe[A]):
494530
*/
495531
inline def compareAndSet(curr: A, next: A)(using inline frame: Frame): Boolean < IO = IO.Unsafe(unsafe.compareAndSet(curr, next))
496532

497-
/** Atomically updates the current value using the given function.
533+
/** Atomically updates the current value using the given function and returns the old value.
498534
* @param f
499535
* The function to apply to the current value
536+
* @return
537+
* The previous value
500538
*/
501-
inline def update[S](f: A => A)(using inline frame: Frame): Unit < IO = updateAndGet(f).unit
539+
inline def getAndUpdate(inline f: A => A)(using inline frame: Frame): A < IO = IO.Unsafe(unsafe.getAndUpdate(f(_)))
502540

503541
/** Atomically updates the current value using the given function and returns the updated value.
504542
* @param f
505543
* The function to apply to the current value
506544
* @return
507545
* The updated value
508546
*/
509-
inline def updateAndGet[S](f: A => A)(using inline frame: Frame): A < IO = IO.Unsafe(unsafe.updateAndGet(f(_)))
547+
inline def updateAndGet(inline f: A => A)(using inline frame: Frame): A < IO = IO.Unsafe(unsafe.updateAndGet(f(_)))
510548

511549
/** Returns a string representation of the current value.
512550
* @return
@@ -556,8 +594,8 @@ object AtomicRef:
556594
inline def lazySet(v: A)(using inline allow: AllowUnsafe): Unit = self.lazySet(v)
557595
inline def getAndSet(v: A)(using inline allow: AllowUnsafe): A = self.getAndSet(v)
558596
inline def compareAndSet(curr: A, next: A)(using inline allow: AllowUnsafe): Boolean = self.compareAndSet(curr, next)
559-
def update[S](f: A => A)(using AllowUnsafe): Unit = discard(self.updateAndGet(f(_)))
560-
def updateAndGet[S](f: A => A)(using AllowUnsafe): A = self.updateAndGet(f(_))
597+
inline def getAndUpdate(inline f: A => A)(using inline allow: AllowUnsafe): A = self.getAndUpdate(f(_))
598+
inline def updateAndGet(inline f: A => A)(using inline allow: AllowUnsafe): A = self.updateAndGet(f(_))
561599
inline def safe: AtomicRef[A] = AtomicRef(self)
562600
end extension
563601
end Unsafe

kyo-core/shared/src/main/scala/kyo/Fiber.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,10 @@ object Fiber extends FiberPlatformSpecific:
679679

680680
def init[E, A]()(using AllowUnsafe): Unsafe[E, A] = IOPromise()
681681

682+
def initMasked[E, A]()(using AllowUnsafe): Unsafe[E, A] =
683+
new IOPromise[E, A]:
684+
override def interrupt[E2 >: E](error: Result.Error[E2]): Boolean = false
685+
682686
private[kyo] def fromIOPromise[E, A](p: IOPromise[E, A]): Unsafe[E, A] = p
683687

684688
extension [E, A](self: Unsafe[E, A])

0 commit comments

Comments
 (0)