Skip to content

Commit 677a59a

Browse files
committed
Optimize Publisher.asFlow fusion and implementation size
* Calling Publisher.asFlow().produceIn(...) uses a single channel. * Implementation fully reuses Publisher.openSubscription code.
1 parent bd0034f commit 677a59a

File tree

8 files changed

+50
-85
lines changed

8 files changed

+50
-85
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+6-1
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
918918
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
919919
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
920920
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
921-
public final fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
921+
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
922922
public fun toString ()Ljava/lang/String;
923923
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
924924
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
@@ -929,6 +929,11 @@ public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coro
929929
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
930930
}
931931

932+
public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector {
933+
public fun <init> (Lkotlinx/coroutines/channels/SendChannel;)V
934+
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
935+
}
936+
932937
public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
933938
public synthetic fun <init> (II)V
934939
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+6-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ import kotlin.jvm.*
1515
internal fun <T> Flow<T>.asChannelFlow(): ChannelFlow<T> =
1616
this as? ChannelFlow ?: ChannelFlowOperatorImpl(this)
1717

18-
// Operators that use channels extend this ChannelFlow and are always fused with each other
18+
/**
19+
* Operators that use channels extend this ChannelFlow and are always fused with each other.
20+
*
21+
* @suppress **This an internal API and should not be used from general code.**
22+
*/
1923
@InternalCoroutinesApi
2024
public abstract class ChannelFlow<T>(
2125
// upstream context
@@ -63,7 +67,7 @@ public abstract class ChannelFlow<T>(
6367
fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
6468
scope.broadcast(context, produceCapacity, start, block = collectToFun)
6569

66-
fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
70+
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
6771
scope.flowProduce(context, produceCapacity, block = collectToFun)
6872

6973
override suspend fun collect(collector: FlowCollector<T>) =

kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package kotlinx.coroutines.flow.internal
66

77
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
89
import kotlinx.coroutines.channels.*
910
import kotlinx.coroutines.channels.ArrayChannel
1011
import kotlinx.coroutines.flow.*
@@ -17,8 +18,13 @@ internal fun <T> FlowCollector<T>.asConcurrentFlowCollector(): ConcurrentFlowCol
1718
// Two basic implementations are here: SendingCollector and ConcurrentFlowCollector
1819
internal interface ConcurrentFlowCollector<T> : FlowCollector<T>
1920

20-
// Concurrent collector because it sends to a channel
21-
internal class SendingCollector<T>(
21+
/**
22+
* Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently.
23+
*
24+
* @suppress **This an internal API and should not be used from general code.**
25+
*/
26+
@InternalCoroutinesApi
27+
public class SendingCollector<T>(
2228
private val channel: SendChannel<T>
2329
) : ConcurrentFlowCollector<T> {
2430
override suspend fun emit(value: T) = channel.send(value)

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ private class FlowCoroutine<T>(
7070
context: CoroutineContext,
7171
uCont: Continuation<T>
7272
) : ScopeCoroutine<T>(context, uCont) {
73-
7473
public override fun childCancelled(cause: Throwable): Boolean {
7574
if (cause is ChildCancelledException) return true
7675
return cancelImpl(cause)
@@ -81,7 +80,6 @@ private class FlowProduceCoroutine<T>(
8180
parentContext: CoroutineContext,
8281
channel: Channel<T>
8382
) : ProducerCoroutine<T>(parentContext, channel) {
84-
8583
public override fun childCancelled(cause: Throwable): Boolean {
8684
if (cause is ChildCancelledException) return true
8785
return cancelImpl(cause)

reactive/kotlinx-coroutines-reactive/src/Channel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit) =
4141
openSubscription().consumeEach(action)
4242

4343
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
44-
private class SubscriptionChannel<T>(
44+
internal class SubscriptionChannel<T>(
4545
private val request: Int
4646
) : LinkedListChannel<T>(), Subscriber<T> {
4747
init {

reactive/kotlinx-coroutines-reactive/src/flow/PublisherAsFlow.kt

+24-72
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import kotlinx.coroutines.*
88
import kotlinx.coroutines.channels.*
99
import kotlinx.coroutines.flow.*
1010
import kotlinx.coroutines.flow.internal.*
11+
import kotlinx.coroutines.reactive.*
1112
import org.reactivestreams.*
1213
import kotlin.coroutines.*
1314

@@ -34,84 +35,35 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
3435
)
3536
public fun <T : Any> Publisher<T>.asFlow(batchSize: Int): Flow<T> = asFlow().buffer(batchSize)
3637

37-
3838
private class PublisherAsFlow<T : Any>(
39-
private val publisher: Publisher<T>, capacity: Int
40-
) : ChannelFlow<T>(
41-
EmptyCoroutineContext,
42-
capacity
43-
) {
44-
45-
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> {
46-
return PublisherAsFlow(publisher, capacity)
47-
}
48-
49-
/*
50-
* It's possible to get rid of the second channel here, but it requires intrusive changes in ChannelFlow.
51-
* Do it after API stabilization (including produceIn/broadcastIn).
52-
*/
53-
override suspend fun collectTo(scope: ProducerScope<T>) = collect { scope.send(it) }
54-
55-
override suspend fun collect(collector: FlowCollector<T>) {
56-
val channel = Channel<T>(capacity)
57-
val subscriber = ReactiveSubscriber(channel, capacity)
58-
publisher.subscribe(subscriber)
59-
try {
60-
var consumed = 0
61-
for (i in channel) {
62-
collector.emit(i)
63-
if (++consumed == capacity) {
64-
consumed = 0
65-
subscriber.subscription.request(capacity.toLong())
66-
}
67-
}
68-
} finally {
69-
subscriber.subscription.cancel()
39+
private val publisher: Publisher<T>,
40+
capacity: Int
41+
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
42+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
43+
PublisherAsFlow(publisher, capacity)
44+
45+
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
46+
val channel = publisher.openSubscription(capacity)
47+
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
48+
channel.cancel(cause?.let {
49+
it as? CancellationException ?: CancellationException("Job was cancelled", it)
50+
})
7051
}
71-
}
72-
73-
private suspend inline fun collect(emit: (element: T) -> Unit) {
74-
val channel = Channel<T>(capacity)
75-
val subscriber = ReactiveSubscriber(channel, capacity)
76-
publisher.subscribe(subscriber)
77-
try {
78-
var consumed = 0
79-
for (i in channel) {
80-
emit(i)
81-
if (++consumed == capacity) {
82-
consumed = 0
83-
subscriber.subscription.request(capacity.toLong())
84-
}
52+
if (handle != null && handle !== NonDisposableHandle) {
53+
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
54+
(channel as SubscriptionChannel<*>).invokeOnClose {
55+
handle.dispose()
8556
}
86-
} finally {
87-
subscriber.subscription.cancel()
8857
}
89-
}
90-
}
91-
92-
@Suppress("SubscriberImplementation")
93-
private class ReactiveSubscriber<T : Any>(
94-
private val channel: Channel<T>,
95-
private val requestSize: Int
96-
) : Subscriber<T> {
97-
98-
lateinit var subscription: Subscription
99-
100-
override fun onComplete() {
101-
channel.close()
58+
return channel
10259
}
10360

104-
override fun onSubscribe(s: Subscription) {
105-
subscription = s
106-
s.request(requestSize.toLong())
107-
}
108-
109-
override fun onNext(t: T) {
110-
// Controlled by requestSize
111-
require(channel.offer(t)) { "Element $t was not added to channel because it was full, $channel" }
61+
override suspend fun collect(collector: FlowCollector<T>) {
62+
val channel = publisher.openSubscription(capacity)
63+
channel.consumeEach { collector.emit(it) }
11264
}
11365

114-
override fun onError(t: Throwable?) {
115-
channel.close(t)
116-
}
66+
// The second channel here is used only for broadcast
67+
override suspend fun collectTo(scope: ProducerScope<T>) =
68+
collect(SendingCollector(scope.channel))
11769
}

reactive/kotlinx-coroutines-reactive/test/flow/PublisherAsFlowTest.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import kotlinx.coroutines.reactive.*
1111
import kotlin.test.*
1212

1313
class PublisherAsFlowTest : TestBase() {
14-
1514
@Test
1615
fun testCancellation() = runTest {
1716
var onNext = 0
@@ -51,11 +50,11 @@ class PublisherAsFlowTest : TestBase() {
5150
send(3)
5251

5352
expect(2)
54-
send(5)
53+
send(6)
5554

5655
expect(4)
5756
send(7)
58-
expect(6)
57+
expect(5)
5958
}
6059

6160
publisher.asFlow().collect {

reactive/kotlinx-coroutines-reactive/test/flow/RangePublisherBufferedTest.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import org.reactivestreams.*
1010
import org.reactivestreams.example.unicast.*
1111
import org.reactivestreams.tck.*
1212

13-
class RangePublisherBufferedTest : PublisherVerification<Int>(TestEnvironment(50, 50)) {
14-
13+
class RangePublisherBufferedTest :
14+
PublisherVerification<Int>(TestEnvironment(50, 50))
15+
{
1516
override fun createPublisher(elements: Long): Publisher<Int> {
1617
return RangePublisher(1, elements.toInt()).asFlow().buffer(2).asPublisher()
1718
}

0 commit comments

Comments
 (0)