Skip to content

Commit 09fb1b9

Browse files
committed
Flow as Flux transform
1 parent c8ed7fb commit 09fb1b9

File tree

4 files changed

+109
-59
lines changed

4 files changed

+109
-59
lines changed

reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt

+60-58
Original file line numberDiff line numberDiff line change
@@ -23,81 +23,83 @@ public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
2323
*/
2424
@Suppress("PublisherImplementation")
2525
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
26-
2726
override fun subscribe(subscriber: Subscriber<in T>?) {
2827
if (subscriber == null) throw NullPointerException()
2928
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
3029
}
30+
}
3131

32-
private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
33-
@Volatile
34-
internal var canceled: Boolean = false
35-
private val requested = AtomicLong(0L)
36-
private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
37-
38-
// This is actually optimizable
39-
private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
40-
try {
41-
consumeFlow()
42-
subscriber.onComplete()
43-
} catch (e: Throwable) {
44-
// Failed with real exception, not due to cancellation
45-
if (!coroutineContext[Job]!!.isCancelled) {
46-
subscriber.onError(e)
47-
}
32+
/** @suppress */
33+
@InternalCoroutinesApi
34+
public class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
35+
@Volatile
36+
internal var canceled: Boolean = false
37+
private val requested = AtomicLong(0L)
38+
private val producer: AtomicReference<CancellableContinuation<Unit>?> = AtomicReference()
39+
40+
// This is actually optimizable
41+
private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
42+
try {
43+
consumeFlow()
44+
subscriber.onComplete()
45+
} catch (e: Throwable) {
46+
// Failed with real exception, not due to cancellation
47+
if (!coroutineContext[Job]!!.isCancelled) {
48+
subscriber.onError(e)
4849
}
4950
}
51+
}
5052

51-
private suspend fun consumeFlow() {
52-
flow.collect { value ->
53-
if (!coroutineContext.isActive) {
54-
subscriber.onComplete()
55-
coroutineContext.ensureActive()
56-
}
53+
private suspend fun consumeFlow() {
54+
val ctx = coroutineContext
55+
flow.collect { value ->
56+
if (!coroutineContext.isActive) {
57+
subscriber.onComplete()
58+
coroutineContext.ensureActive()
59+
}
5760

58-
if (requested.get() == 0L) {
59-
suspendCancellableCoroutine<Unit> {
60-
producer.set(it)
61-
if (requested.get() != 0L) it.resumeSafely()
62-
}
61+
if (requested.get() == 0L) {
62+
suspendCancellableCoroutine<Unit> {
63+
producer.set(it)
64+
if (requested.get() != 0L) it.resumeSafely()
6365
}
64-
65-
requested.decrementAndGet()
66-
subscriber.onNext(value)
6766
}
68-
}
6967

70-
override fun cancel() {
71-
canceled = true
72-
job.cancel()
68+
requested.decrementAndGet()
69+
subscriber.onNext(value)
7370
}
71+
}
7472

75-
override fun request(n: Long) {
76-
if (n <= 0) {
77-
return
78-
}
73+
override fun cancel() {
74+
canceled = true
75+
job.cancel()
76+
}
77+
78+
override fun request(n: Long) {
79+
if (n <= 0) {
80+
return
81+
}
7982

80-
if (canceled) return
83+
if (canceled) return
8184

82-
job.start()
83-
var snapshot: Long
84-
var newValue: Long
85-
do {
86-
snapshot = requested.get()
87-
newValue = snapshot + n
88-
if (newValue <= 0L) newValue = Long.MAX_VALUE
89-
} while (!requested.compareAndSet(snapshot, newValue))
85+
job.start()
86+
var snapshot: Long
87+
var newValue: Long
88+
do {
89+
snapshot = requested.get()
90+
newValue = snapshot + n
91+
if (newValue <= 0L) newValue = Long.MAX_VALUE
92+
} while (!requested.compareAndSet(snapshot, newValue))
9093

91-
val prev = producer.get()
92-
if (prev == null || !producer.compareAndSet(prev, null)) return
93-
prev.resumeSafely()
94-
}
94+
val prev = producer.get()
95+
if (prev == null || !producer.compareAndSet(prev, null)) return
96+
prev.resumeSafely()
97+
}
9598

96-
private fun CancellableContinuation<Unit>.resumeSafely() {
97-
val token = tryResume(Unit)
98-
if (token != null) {
99-
completeResume(token)
100-
}
99+
private fun CancellableContinuation<Unit>.resumeSafely() {
100+
val token = tryResume(Unit)
101+
if (token != null) {
102+
completeResume(token)
101103
}
102104
}
103-
}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.flow.Flow
5+
import kotlinx.coroutines.flow.flowOn
6+
import kotlinx.coroutines.reactive.flow.FlowSubscription
7+
import reactor.core.CoreSubscriber
8+
import reactor.core.publisher.Flux
9+
10+
/**
11+
* Transforms the given flow to [Flux].
12+
*/
13+
@ExperimentalCoroutinesApi
14+
public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
15+
16+
private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
17+
override fun subscribe(subscriber: CoreSubscriber<in T>?) {
18+
if (subscriber == null) throw NullPointerException()
19+
subscriber.onSubscribe(FlowSubscription(flow.flowOn(subscriber.currentContext().asCoroutineContext()), subscriber))
20+
}
21+
}

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,4 @@ private fun <T> reactorPublish(
7474
val coroutine = PublisherCoroutine(newContext, subscriber)
7575
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
7676
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
77-
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package kotlinx.coroutines.reactor
2+
3+
import kotlinx.coroutines.flow.*
4+
import kotlinx.coroutines.reactive.*
5+
import kotlinx.coroutines.runBlocking
6+
import org.junit.Test
7+
import reactor.core.publisher.Mono
8+
import reactor.util.context.Context
9+
import kotlin.test.assertEquals
10+
11+
class FlowAsFluxTest {
12+
@Test
13+
fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
14+
val flux = flow<String> {
15+
(1..4).forEach { i -> emit(m(i).awaitFirst()) }
16+
} .asFlux()
17+
.subscriberContext(Context.of(1, "1"))
18+
.subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
19+
var i = 0
20+
flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
21+
}
22+
23+
private fun m(i: Int): Mono<String> = mono {
24+
val ctx = coroutineContext[ReactorContext]?.context
25+
ctx?.getOrDefault(i, "noValue")
26+
}
27+
}

0 commit comments

Comments
 (0)