Skip to content

Commit f8b43e1

Browse files
elizarovqwwdfsad
authored andcommitted
Introduce ReceiveChannel.consumeAsFlow and FlowCollector.emitAll(chan)
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient via emitAll extension. * Experimental FlowCollector.emitAll extension is introduced. * It is based on the (internal) Channel.receiveOrClose and ensures that the reference to the last emitted value is not retained (does not leak). Fixes #1340 Fixes #1333
1 parent a8904e2 commit f8b43e1

File tree

4 files changed

+115
-10
lines changed

4 files changed

+115
-10
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
856856
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
857857
public static final fun concatWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
858858
public static final fun conflate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
859+
public static final fun consumeAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
859860
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
860861
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
861862
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -866,6 +867,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
866867
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
867868
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
868869
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
870+
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
869871
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
870872
public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
871873
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/flow/Channels.kt

+72-8
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,81 @@
77

88
package kotlinx.coroutines.flow
99

10+
import kotlinx.atomicfu.*
1011
import kotlinx.coroutines.*
1112
import kotlinx.coroutines.channels.*
12-
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
13-
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
14-
import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
1513
import kotlinx.coroutines.flow.internal.*
16-
import kotlin.coroutines.*
1714
import kotlin.jvm.*
15+
import kotlinx.coroutines.flow.unsafeFlow as flow
16+
17+
/**
18+
* Emits all elements from the given [channel] to this flow collector and [cancels][cancel] (consumes)
19+
* the channel afterwards. If you need to iterate over the channel without consuming it,
20+
* a regular `for` loop should be used instead.
21+
*
22+
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
23+
* See [consumeEach][ReceiveChannel.consumeEach].
24+
*/
25+
@ExperimentalCoroutinesApi
26+
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
27+
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
28+
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
29+
// fix retention of the last emitted value.
30+
// See https://youtrack.jetbrains.com/issue/KT-16222
31+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1333
32+
var cause: Throwable? = null
33+
try {
34+
while (true) {
35+
// :KLUDGE: This "run" call is resolved to an extension function "run" and forces the size of
36+
// spilled state to increase by an additional slot, so there are 4 object local variables spilled here
37+
// which makes the size of spill state equal to the 4 slots that are spilled around subsequent "emit"
38+
// call, ensuring that the previously emitted value is not retained in the state while receiving
39+
// the next one.
40+
// L$0 <- this
41+
// L$1 <- channel
42+
// L$2 <- cause
43+
// L$3 <- this$run (actually equal to this)
44+
val result = run { channel.receiveOrClosed() }
45+
if (result.isClosed) {
46+
result.closeCause?.let { throw it }
47+
break // returns normally when result.closeCause == null
48+
}
49+
// result is spilled here to the coroutine state and retained after the call, even though
50+
// it is not actually needed in the next loop iteration.
51+
// L$0 <- this
52+
// L$1 <- channel
53+
// L$2 <- cause
54+
// L$3 <- result
55+
emit(result.value)
56+
}
57+
} catch (e: Throwable) {
58+
cause = e
59+
throw e
60+
} finally {
61+
channel.cancelConsumed(cause)
62+
}
63+
}
64+
65+
/**
66+
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
67+
* on the first collection from this flow. The resulting flow can be collected just once and throws
68+
* [IllegalStateException] when trying to collect it more than once.
69+
*
70+
* ### Cancellation semantics
71+
* 1) Flow consumer is cancelled when the original channel is cancelled.
72+
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
73+
* 3) If the flow consumer fails with an exception, channel is cancelled.
74+
*
75+
*/
76+
@FlowPreview
77+
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = object : Flow<T> {
78+
val collected = atomic(false)
79+
80+
override suspend fun collect(collector: FlowCollector<T>) {
81+
check(!collected.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
82+
collector.emitAll(this@consumeAsFlow)
83+
}
84+
}
1885

1986
/**
2087
* Represents the given broadcast channel as a hot flow.
@@ -27,10 +94,7 @@ import kotlin.jvm.*
2794
*/
2895
@FlowPreview
2996
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
30-
val subscription = openSubscription()
31-
subscription.consumeEach { value ->
32-
emit(value)
33-
}
97+
emitAll(openSubscription())
3498
}
3599

36100
/**

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ public abstract class ChannelFlow<T>(
7272

7373
override suspend fun collect(collector: FlowCollector<T>) =
7474
coroutineScope {
75-
val channel = produceImpl(this)
76-
channel.consumeEach { collector.emit(it) }
75+
collector.emitAll(produceImpl(this))
7776
}
7877

7978
// debug toString

kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt

+40
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,46 @@ import kotlinx.coroutines.channels.*
99
import kotlin.test.*
1010

1111
class ChannelBuildersFlowTest : TestBase() {
12+
@Test
13+
fun testChannelConsumeAsFlow() = runTest {
14+
val channel = produce {
15+
repeat(10) {
16+
send(it + 1)
17+
}
18+
}
19+
val flow = channel.consumeAsFlow()
20+
assertEquals(55, flow.sum())
21+
assertFailsWith<IllegalStateException> { flow.collect() }
22+
}
23+
24+
@Test
25+
fun testConsumeAsFlowCancellation() = runTest {
26+
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
27+
repeat(10) {
28+
send(it + 1)
29+
}
30+
throw TestException()
31+
}
32+
val flow = channel.consumeAsFlow()
33+
assertEquals(15, flow.take(5).sum())
34+
// the channel should have been canceled, even though took only 5 elements
35+
assertTrue(channel.isClosedForReceive)
36+
assertFailsWith<IllegalStateException> { flow.collect() }
37+
}
38+
39+
@Test
40+
fun testConsumeAsFlowException() = runTest {
41+
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
42+
repeat(10) {
43+
send(it + 1)
44+
}
45+
throw TestException()
46+
}
47+
val flow = channel.consumeAsFlow()
48+
assertFailsWith<TestException> { flow.sum() }
49+
assertFailsWith<IllegalStateException> { flow.collect() }
50+
}
51+
1252
@Test
1353
fun testBroadcastChannelAsFlow() = runTest {
1454
val channel = broadcast {

0 commit comments

Comments
 (0)