Skip to content

Commit 3f795b1

Browse files
committed
Implement ObservableValue<T>.asFlow()
1 parent 6810745 commit 3f795b1

File tree

3 files changed

+54
-1
lines changed

3 files changed

+54
-1
lines changed

Diff for: binary-compatibility-validator/reference-public-api/kotlinx-coroutines-javafx.txt

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
2+
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
3+
}
4+
15
public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
26
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
37
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V

Diff for: ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package kotlinx.coroutines.javafx
2+
3+
import kotlinx.coroutines.ExperimentalCoroutinesApi
4+
import javafx.beans.value.ChangeListener
5+
import javafx.beans.value.ObservableValue
6+
import kotlinx.coroutines.channels.awaitClose
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.callbackFlow
9+
import kotlinx.coroutines.flow.conflate
10+
import java.util.concurrent.atomic.AtomicInteger
11+
12+
/**
13+
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and produces
14+
* its values as they change.
15+
*
16+
* The resulting flow is conflated, meaning that if several values arrive in a quick succession, only
17+
* the last one will be produced.
18+
*
19+
* It produces at least one value.
20+
*
21+
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
22+
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
23+
*/
24+
@ExperimentalCoroutinesApi
25+
fun <T: Any> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
26+
// It is unknown which thread will produce the initial value
27+
val UNKNOWN_SOURCE = 0
28+
// 1 -- some thread succeeded in CAS, so it will offer the first value
29+
val DETERMINED_SOURCE = 1
30+
// 2 -- the first value has been offered, so everyone else may proceed
31+
val INITIAL_OFFER_PASSED = 2
32+
val initialOfferState = AtomicInteger(UNKNOWN_SOURCE)
33+
val listener = ChangeListener<T> { observable, oldValue, newValue ->
34+
while (initialOfferState.get() != INITIAL_OFFER_PASSED) {
35+
if (initialOfferState.compareAndSet(UNKNOWN_SOURCE, DETERMINED_SOURCE)) {
36+
offer(newValue)
37+
initialOfferState.set(INITIAL_OFFER_PASSED)
38+
return@ChangeListener
39+
}
40+
}
41+
offer(newValue)
42+
}
43+
addListener(listener)
44+
if (initialOfferState.compareAndSet(UNKNOWN_SOURCE, DETERMINED_SOURCE)) {
45+
send(value)
46+
initialOfferState.set(INITIAL_OFFER_PASSED)
47+
}
48+
awaitClose { removeListener(listener) }
49+
}.conflate()

Diff for: ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt renamed to ui/kotlinx-coroutines-javafx/test/JavaFxDispatcherTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import javafx.application.*
88
import kotlinx.coroutines.*
99
import org.junit.*
1010

11-
class JavaFxTest : TestBase() {
11+
class JavaFxDispatcherTest : TestBase() {
1212
@Before
1313
fun setup() {
1414
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")

0 commit comments

Comments
 (0)