Skip to content

Commit 897fefa

Browse files
committed
Implement ObservableValue<T>.asFlow()
1 parent 6810745 commit 897fefa

File tree

5 files changed

+168
-1
lines changed

5 files changed

+168
-1
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
2+
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
3+
}
4+
15
public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
26
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
37
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.value.ChangeListener
4+
import javafx.beans.value.ObservableValue
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.channels.awaitClose
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.callbackFlow
9+
import kotlinx.coroutines.flow.conflate
10+
import kotlinx.coroutines.flow.flowOn
11+
12+
/**
13+
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
14+
* its values as they change.
15+
*
16+
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
17+
* the last one will be produced.
18+
*
19+
* It produces at least one value.
20+
*
21+
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
22+
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
23+
*/
24+
@ExperimentalCoroutinesApi
25+
public fun <T: Any> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
26+
val listener = ChangeListener<T> { _, _, newValue ->
27+
try {
28+
offer(newValue)
29+
} catch (e: CancellationException) {
30+
// In case the event fires after the channel is closed
31+
}
32+
}
33+
addListener(listener)
34+
send(value)
35+
awaitClose {
36+
removeListener(listener)
37+
}
38+
}.flowOn(Dispatchers.JavaFx).conflate()

ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt renamed to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import javafx.application.*
88
import kotlinx.coroutines.*
99
import org.junit.*
1010

11-
class JavaFxTest : TestBase() {
11+
class JavaFxDispatcherTest : TestBase() {
1212
@Before
1313
fun setup() {
1414
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.property.SimpleIntegerProperty
4+
import kotlinx.coroutines.TestBase
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.flow.*
7+
import org.junit.Before
8+
import org.junit.Test
9+
import kotlin.test.*
10+
11+
12+
class JavaFxObservableAsFlowTest : TestBase() {
13+
14+
@Before
15+
fun setup() {
16+
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
17+
}
18+
19+
@Test
20+
fun testFlowOrder() = runTest {
21+
if (!initPlatform()) {
22+
println("Skipping JavaFxTest in headless environment")
23+
return@runTest // ignore test in headless environments
24+
}
25+
26+
val integerProperty = SimpleIntegerProperty(0)
27+
val n = 10000 * stressTestMultiplier
28+
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
29+
newSingleThreadContext("setter").use { pool ->
30+
launch(pool) {
31+
for (i in 1..n) {
32+
launch(Dispatchers.JavaFx) {
33+
integerProperty.set(i)
34+
}
35+
}
36+
}
37+
var i = -1
38+
flow.collect { j ->
39+
assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
40+
i = j
41+
}
42+
}
43+
}
44+
45+
@Test
46+
fun testConflation() = runTest {
47+
if (!initPlatform()) {
48+
println("Skipping JavaFxTest in headless environment")
49+
return@runTest // ignore test in headless environments
50+
}
51+
52+
withContext(Dispatchers.JavaFx) {
53+
val END_MARKER = -1
54+
val integerProperty = SimpleIntegerProperty(0)
55+
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
56+
launch {
57+
yield() // to subscribe to [integerProperty]
58+
yield() // send 0
59+
integerProperty.set(1)
60+
expect(3)
61+
yield() // send 1
62+
expect(5)
63+
integerProperty.set(2)
64+
for (i in (-100..-2)) {
65+
integerProperty.set(i) // should be skipped due to conflation
66+
}
67+
integerProperty.set(3)
68+
expect(6)
69+
yield() // send 2 and 3
70+
integerProperty.set(-1)
71+
}
72+
expect(1)
73+
flow.collect { i ->
74+
when (i) {
75+
0 -> expect(2)
76+
1 -> expect(4)
77+
2 -> expect(7)
78+
3 -> expect(8)
79+
else -> fail("i is $i")
80+
}
81+
}
82+
finish(9)
83+
}
84+
}
85+
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import javafx.beans.property.SimpleIntegerProperty
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.flow.first
6+
import org.junit.Before
7+
import org.junit.Test
8+
9+
class JavaFxStressTest : TestBase() {
10+
11+
@Before
12+
fun setup() {
13+
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
14+
}
15+
16+
@Test
17+
fun cancellationRaceStressTest() = runTest {
18+
if (!initPlatform()) {
19+
println("Skipping JavaFxTest in headless environment")
20+
return@runTest // ignore test in headless environments
21+
}
22+
23+
val integerProperty = SimpleIntegerProperty(0)
24+
val flow = integerProperty.asFlow()
25+
var i = 1
26+
val n = 1000 * stressTestMultiplier
27+
newSingleThreadContext("collector").use { pool ->
28+
repeat (n) {
29+
launch(pool) {
30+
flow.first()
31+
}
32+
withContext(Dispatchers.JavaFx) {
33+
integerProperty.set(i)
34+
}
35+
i += 1
36+
}
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)