Skip to content

Commit 81e829b

Browse files
committed
Implement ObservableValue<T>.asFlow()
1 parent 6810745 commit 81e829b

File tree

4 files changed

+147
-1
lines changed

4 files changed

+147
-1
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
2+
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
3+
}
4+
15
public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
26
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
37
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.value.ChangeListener
4+
import javafx.beans.value.ObservableValue
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.channels.awaitClose
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.callbackFlow
9+
import kotlinx.coroutines.flow.conflate
10+
11+
/**
12+
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
13+
* its values as they change.
14+
*
15+
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
16+
* the last one will be produced.
17+
*
18+
* It produces at least one value.
19+
*
20+
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
21+
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
22+
*/
23+
@ExperimentalCoroutinesApi
24+
fun <T: Any> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
25+
val listener = ChangeListener<T> { observable, oldValue, newValue ->
26+
try {
27+
offer(newValue)
28+
} catch (e: CancellationException) {
29+
// In case the event fires after the channel is closed
30+
}
31+
}
32+
withContext(Dispatchers.JavaFx) {
33+
addListener(listener)
34+
send(value)
35+
}
36+
awaitClose {
37+
runBlocking {
38+
withContext(Dispatchers.JavaFx) {
39+
removeListener(listener)
40+
}
41+
}
42+
}
43+
}.conflate()

ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt renamed to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import javafx.application.*
88
import kotlinx.coroutines.*
99
import org.junit.*
1010

11-
class JavaFxTest : TestBase() {
11+
class JavaFxDispatcherTest : TestBase() {
1212
@Before
1313
fun setup() {
1414
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.property.SimpleIntegerProperty
4+
import kotlinx.coroutines.TestBase
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.flow.*
7+
import org.junit.Before
8+
import org.junit.Test
9+
import kotlin.test.assertTrue
10+
11+
12+
class JavaFxObservableAsFlowTest : TestBase() {
13+
14+
@Before
15+
fun setup() {
16+
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
17+
}
18+
19+
@Test
20+
fun testFlowOrder() = runTest {
21+
if (!initPlatform()) {
22+
println("Skipping JavaFxTest in headless environment")
23+
return@runTest // ignore test in headless environments
24+
}
25+
26+
val integerProperty = SimpleIntegerProperty(0)
27+
val n = 10000 * stressTestMultiplier
28+
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
29+
newSingleThreadContext("setter").use { pool ->
30+
launch(pool) {
31+
for (i in 1..n) {
32+
launch(Dispatchers.JavaFx) {
33+
integerProperty.set(i)
34+
}
35+
}
36+
}
37+
var i = -1
38+
flow.collect { j ->
39+
// elements are neither repeated nor shuffled
40+
assertTrue(i < (j as Int))
41+
i = j
42+
}
43+
// at least one element is present
44+
assertTrue(i != -1)
45+
}
46+
}
47+
48+
@Test
49+
fun testConflation() = runTest {
50+
if (!initPlatform()) {
51+
println("Skipping JavaFxTest in headless environment")
52+
return@runTest // ignore test in headless environments
53+
}
54+
55+
val END_MARKER = -1
56+
val integerProperty = SimpleIntegerProperty(0)
57+
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
58+
launch(start = CoroutineStart.UNDISPATCHED) {
59+
withContext(Dispatchers.JavaFx) {
60+
integerProperty.set(1)
61+
}
62+
withContext(Dispatchers.JavaFx) {
63+
integerProperty.set(-2) // should be skipped
64+
integerProperty.set(2)
65+
}
66+
withContext(Dispatchers.JavaFx) {
67+
integerProperty.set(END_MARKER)
68+
}
69+
}
70+
71+
flow.collect { i ->
72+
assertTrue(i == 1 || i == 2)
73+
}
74+
}
75+
76+
@Test
77+
fun cancellationRaceStressTest() = runTest {
78+
if (!initPlatform()) {
79+
println("Skipping JavaFxTest in headless environment")
80+
return@runTest // ignore test in headless environments
81+
}
82+
83+
val integerProperty = SimpleIntegerProperty(0)
84+
val flow = integerProperty.asFlow()
85+
var i = 1
86+
val n = 1000 * stressTestMultiplier
87+
newSingleThreadContext("collector").use { pool ->
88+
repeat (n) {
89+
launch(pool) {
90+
flow.first()
91+
}
92+
withContext(Dispatchers.JavaFx) {
93+
integerProperty.set(i)
94+
}
95+
i += 1
96+
}
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)