Skip to content

Commit 24586c1

Browse files
committed
Cleanup FlowAsPublisher
1 parent 35f9ad5 commit 24586c1

File tree

1 file changed

+8
-14
lines changed

1 file changed

+8
-14
lines changed

reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt

+8-14
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.flow.*
99
import org.reactivestreams.*
1010
import java.util.concurrent.atomic.*
11+
import kotlin.coroutines.*
1112

1213
/**
1314
* Transforms the given flow to a spec-compliant [Publisher]
@@ -18,19 +19,14 @@ public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
1819

1920
/**
2021
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
21-
* Any calls to [cancel] cancels the original flow.
22+
* Any calls to [cancel] cancel the original flow.
2223
*/
2324
@Suppress("PublisherImplementation")
2425
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
2526

2627
override fun subscribe(subscriber: Subscriber<in T>?) {
2728
if (subscriber == null) throw NullPointerException()
28-
subscriber.onSubscribe(
29-
FlowSubscription(
30-
flow,
31-
subscriber
32-
)
33-
)
29+
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
3430
}
3531

3632
private class FlowSubscription<T>(val flow: Flow<T>, val subscriber: Subscriber<in T>) : Subscription {
@@ -45,18 +41,18 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T>
4541
consumeFlow()
4642
subscriber.onComplete()
4743
} catch (e: Throwable) {
48-
// Failed with real exception
44+
// Failed with real exception, not due to cancellation
4945
if (!coroutineContext[Job]!!.isCancelled) {
5046
subscriber.onError(e)
5147
}
5248
}
5349
}
5450

55-
private suspend fun CoroutineScope.consumeFlow() {
51+
private suspend fun consumeFlow() {
5652
flow.collect { value ->
57-
if (!isActive) {
53+
if (!coroutineContext.isActive) {
5854
subscriber.onComplete()
59-
yield() // Force cancellation
55+
coroutineContext.ensureActive()
6056
}
6157

6258
if (requested.get() == 0L) {
@@ -67,7 +63,7 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T>
6763
}
6864

6965
requested.decrementAndGet()
70-
val result = kotlin.runCatching {
66+
val result = runCatching {
7167
subscriber.onNext(value)
7268
}
7369

@@ -96,12 +92,10 @@ private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T>
9692
snapshot = requested.get()
9793
newValue = snapshot + n
9894
if (newValue <= 0L) newValue = Long.MAX_VALUE
99-
10095
} while (!requested.compareAndSet(snapshot, newValue))
10196

10297
val prev = producer.get()
10398
if (prev == null || !producer.compareAndSet(prev, null)) return
104-
10599
prev.resumeSafely()
106100
}
107101

0 commit comments

Comments
 (0)