Skip to content

Commit f6387a7

Browse files
elizarovqwwdfsad
authored andcommitted
Fuse consumeAsFlow with channel-using flow operators
1 parent f8b43e1 commit f6387a7

File tree

4 files changed

+83
-6
lines changed

4 files changed

+83
-6
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
952952
public final field context Lkotlin/coroutines/CoroutineContext;
953953
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
954954
public fun additionalToStringProps ()Ljava/lang/String;
955-
public final fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
955+
public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
956956
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
957957
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
958958
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;

kotlinx-coroutines-core/common/src/flow/Channels.kt

+51-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.atomicfu.*
1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.channels.*
1313
import kotlinx.coroutines.flow.internal.*
14+
import kotlin.coroutines.*
1415
import kotlin.jvm.*
1516
import kotlinx.coroutines.flow.unsafeFlow as flow
1617

@@ -68,19 +69,65 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
6869
* [IllegalStateException] when trying to collect it more than once.
6970
*
7071
* ### Cancellation semantics
72+
*
7173
* 1) Flow consumer is cancelled when the original channel is cancelled.
7274
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
7375
* 3) If the flow consumer fails with an exception, channel is cancelled.
7476
*
77+
* ### Operator fusion
78+
*
79+
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `consumeAsFlow` are fused.
80+
* In particular, [produceIn] returns the original channel (but throws [IllegalStateException] on repeated calls).
81+
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
7582
*/
7683
@FlowPreview
77-
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = object : Flow<T> {
78-
val collected = atomic(false)
84+
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ConsumeAsFlow(this)
85+
86+
/**
87+
* Represents an existing [channel] as [ChannelFlow] implementation.
88+
* It fuses with subsequent [flowOn] operators, but for the most part ignores the specified context.
89+
* However, additional [buffer] calls cause a separate buffering channel to be created and that is where
90+
* the context might play a role, because it is used by the producing coroutine.
91+
*/
92+
private class ConsumeAsFlow<T>(
93+
private val channel: ReceiveChannel<T>,
94+
context: CoroutineContext = EmptyCoroutineContext,
95+
capacity: Int = Channel.OPTIONAL_CHANNEL
96+
) : ChannelFlow<T>(context, capacity) {
97+
private val consumed = atomic(false)
98+
99+
private fun markConsumed() =
100+
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
101+
102+
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
103+
ConsumeAsFlow(channel, context, capacity)
104+
105+
override suspend fun collectTo(scope: ProducerScope<T>) =
106+
SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll
107+
108+
override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
109+
markConsumed() // fail fast on repeated attempt to collect it
110+
return super.broadcastImpl(scope, start)
111+
}
112+
113+
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
114+
markConsumed() // fail fast on repeated attempt to collect it
115+
return if (capacity == Channel.OPTIONAL_CHANNEL) {
116+
channel // direct
117+
} else
118+
super.produceImpl(scope) // extra buffering channel
119+
}
79120

80121
override suspend fun collect(collector: FlowCollector<T>) {
81-
check(!collected.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
82-
collector.emitAll(this@consumeAsFlow)
122+
if (capacity == Channel.OPTIONAL_CHANNEL) {
123+
markConsumed()
124+
collector.emitAll(channel) // direct
125+
} else {
126+
super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed
127+
}
83128
}
129+
130+
override fun additionalToStringProps(): String = "channel=$channel, "
84131
}
85132

86133
/**

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public abstract class ChannelFlow<T>(
6464
private val produceCapacity: Int
6565
get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity
6666

67-
fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
67+
open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> =
6868
scope.broadcast(context, produceCapacity, start, block = collectToFun)
6969

7070
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =

kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt

+30
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,36 @@ class ChannelBuildersFlowTest : TestBase() {
4949
assertFailsWith<IllegalStateException> { flow.collect() }
5050
}
5151

52+
@Test
53+
fun testConsumeAsFlowProduceFusing() = runTest {
54+
val channel = produce { send("OK") }
55+
val flow = channel.consumeAsFlow()
56+
assertSame(channel, flow.produceIn(this))
57+
assertFailsWith<IllegalStateException> { flow.produceIn(this) }
58+
channel.cancel()
59+
}
60+
61+
@Test
62+
fun testConsumeAsFlowProduceBuffered() = runTest {
63+
expect(1)
64+
val channel = produce {
65+
expect(3)
66+
(1..10).forEach { send(it) }
67+
expect(4) // produces everything because of buffering
68+
}
69+
val flow = channel.consumeAsFlow().buffer() // request buffering
70+
expect(2) // producer is not running yet
71+
val result = flow.produceIn(this)
72+
// run the flow pipeline until it consumes everything into buffer
73+
while (!channel.isClosedForReceive) yield()
74+
expect(5) // produced had done running (buffered stuff)
75+
assertNotSame(channel, result)
76+
assertFailsWith<IllegalStateException> { flow.produceIn(this) }
77+
// check that we received everything
78+
assertEquals((1..10).toList(), result.toList())
79+
finish(6)
80+
}
81+
5282
@Test
5383
fun testBroadcastChannelAsFlow() = runTest {
5484
val channel = broadcast {

0 commit comments

Comments
 (0)