Skip to content

Commit fc8a4c1

Browse files
committed
Introduce ReceiveChannel.consumeAsFlow and FlowCollector.emitAll(chan)
* This is a consuming conversion -- the resulting flow can be collected just once and the channel is closed after the first collect. * The implementation is made efficient via emitAll extension. * Experimental FlowCollector.emitAll extension is introduced. * It is based on the (internal) Channel.receiveOrClose and ensures that the reference to the last emitted value is not retained (does not leak). Fixes #1340 Fixes #1333
1 parent a259825 commit fc8a4c1

File tree

6 files changed

+81
-14
lines changed

6 files changed

+81
-14
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
848848
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
849849
public static final synthetic fun combineLatest (Lkotlinx/coroutines/flow/Flow;[Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
850850
public static final fun conflate (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
851+
public static final fun consumeAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
851852
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
852853
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
853854
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
@@ -857,6 +858,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
857858
public static final fun distinctUntilChangedBy (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
858859
public static final fun drop (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
859860
public static final fun dropWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
861+
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
860862
public static final fun emitAll (Lkotlinx/coroutines/flow/FlowCollector;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
861863
public static final fun emptyFlow ()Lkotlinx/coroutines/flow/Flow;
862864
public static final fun filter (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kotlinx.coroutines.channels
66

77
import kotlinx.atomicfu.*
88
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
910
import kotlinx.coroutines.internal.*
1011
import kotlinx.coroutines.intrinsics.*
1112
import kotlinx.coroutines.selects.*

kotlinx-coroutines-core/common/src/channels/Channel.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
package kotlinx.coroutines.channels
88

99
import kotlinx.coroutines.*
10+
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
11+
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
1012
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
1113
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
1214
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
13-
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
14-
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
15-
import kotlinx.coroutines.internal.systemProp
15+
import kotlinx.coroutines.flow.*
16+
import kotlinx.coroutines.internal.*
1617
import kotlinx.coroutines.selects.*
1718
import kotlin.jvm.*
1819
import kotlin.internal.*

kotlinx-coroutines-core/common/src/flow/Channels.kt

+72-8
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,81 @@
77

88
package kotlinx.coroutines.flow
99

10+
import kotlinx.atomicfu.*
1011
import kotlinx.coroutines.*
1112
import kotlinx.coroutines.channels.*
12-
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
13-
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
14-
import kotlinx.coroutines.channels.Channel.Factory.OPTIONAL_CHANNEL
1513
import kotlinx.coroutines.flow.internal.*
16-
import kotlin.coroutines.*
1714
import kotlin.jvm.*
15+
import kotlinx.coroutines.flow.unsafeFlow as flow
16+
17+
/**
18+
* Emits all elements from the given [channel] to this flow collector and [cancels][cancel] (consumes)
19+
* the channel afterwards. If you need to iterate over the channel without consuming it,
20+
* a regular `for` loop should be used instead.
21+
*
22+
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
23+
* See [consumeEach][ReceiveChannel.consumeEach].
24+
*/
25+
@ExperimentalCoroutinesApi
26+
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
27+
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
28+
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
29+
// fix retention of the last emitted value.
30+
// See https://youtrack.jetbrains.com/issue/KT-16222
31+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1333
32+
var cause: Throwable? = null
33+
try {
34+
while (true) {
35+
// :KLUDGE: This "run" call is resolved to an extension function "run" and forces the size of
36+
// spilled state to increase by an additional slot, so there are 4 object local variables spilled here
37+
// which makes the size of spill state equal to the 4 slots that are spilled around subsequent "emit"
38+
// call, ensuring that the previously emitted value is not retained in the state while receiving
39+
// the next one.
40+
// L$0 <- this
41+
// L$1 <- channel
42+
// L$2 <- cause
43+
// L$3 <- this$run (actually equal to this)
44+
val result = run { channel.receiveOrClosed() }
45+
if (result.isClosed) {
46+
result.closeCause?.let { throw it }
47+
break // returns normally when result.closeCause == null
48+
}
49+
// result is spilled here to the coroutine state and retained after the call, even though
50+
// it is not actually needed in the next loop iteration.
51+
// L$0 <- this
52+
// L$1 <- channel
53+
// L$2 <- cause
54+
// L$3 <- result
55+
emit(result.value)
56+
}
57+
} catch (e: Throwable) {
58+
cause = e
59+
throw e
60+
} finally {
61+
channel.cancelConsumed(cause)
62+
}
63+
}
64+
65+
/**
66+
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
67+
* on the first collection from this flow. The resulting flow can be collected just once and throws
68+
* [IllegalStateException] when trying to collect it more than once.
69+
*
70+
* ### Cancellation semantics
71+
* 1) Flow consumer is cancelled when the original channel is cancelled.
72+
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
73+
* 3) If the flow consumer fails with an exception, channel is cancelled.
74+
*
75+
*/
76+
@FlowPreview
77+
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = object : Flow<T> {
78+
val collected = atomic(false)
79+
80+
override suspend fun collect(collector: FlowCollector<T>) {
81+
check(!collected.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
82+
collector.emitAll(this@consumeAsFlow)
83+
}
84+
}
1885

1986
/**
2087
* Represents the given broadcast channel as a hot flow.
@@ -27,10 +94,7 @@ import kotlin.jvm.*
2794
*/
2895
@FlowPreview
2996
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
30-
val subscription = openSubscription()
31-
subscription.consumeEach { value ->
32-
emit(value)
33-
}
97+
emitAll(openSubscription())
3498
}
3599

36100
/**

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ public abstract class ChannelFlow<T>(
7272

7373
override suspend fun collect(collector: FlowCollector<T>) =
7474
coroutineScope {
75-
val channel = produceImpl(this)
76-
channel.consumeEach { collector.emit(it) }
75+
collector.emitAll(produceImpl(this))
7776
}
7877

7978
// debug toString

kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private class ChannelViaBroadcast<E>(
6161
override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
6262
override fun poll(): E? = sub.poll()
6363
override fun iterator(): ChannelIterator<E> = sub.iterator()
64-
64+
6565
override fun cancel(cause: CancellationException?) = sub.cancel(cause)
6666

6767
// implementing hidden method anyway, so can cast to an internal class

0 commit comments

Comments
 (0)