@@ -11,6 +11,7 @@ import kotlinx.atomicfu.*
11
11
import kotlinx.coroutines.*
12
12
import kotlinx.coroutines.channels.*
13
13
import kotlinx.coroutines.flow.internal.*
14
+ import kotlin.coroutines.*
14
15
import kotlin.jvm.*
15
16
import kotlinx.coroutines.flow.unsafeFlow as flow
16
17
@@ -68,19 +69,65 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
68
69
* [IllegalStateException] when trying to collect it more than once.
69
70
*
70
71
* ### Cancellation semantics
72
+ *
71
73
* 1) Flow consumer is cancelled when the original channel is cancelled.
72
74
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
73
75
* 3) If the flow consumer fails with an exception, channel is cancelled.
74
76
*
77
+ * ### Operator fusion
78
+ *
79
+ * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `consumeAsFlow` are fused.
80
+ * In particular, [produceIn] returns the original channel (but throws [IllegalStateException] on repeated calls).
81
+ * Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
75
82
*/
76
83
@FlowPreview
77
- public fun <T > ReceiveChannel<T>.consumeAsFlow (): Flow <T > = object : Flow <T > {
78
- val collected = atomic(false )
84
+ public fun <T > ReceiveChannel<T>.consumeAsFlow (): Flow <T > = ConsumeAsFlow (this )
85
+
86
+ /* *
87
+ * Represents an existing [channel] as [ChannelFlow] implementation.
88
+ * It fuses with subsequent [flowOn] operators, but for the most part ignores the specified context.
89
+ * However, additional [buffer] calls cause a separate buffering channel to be created and that is where
90
+ * the context might play a role, because it is used by the producing coroutine.
91
+ */
92
+ private class ConsumeAsFlow <T >(
93
+ private val channel : ReceiveChannel <T >,
94
+ context : CoroutineContext = EmptyCoroutineContext ,
95
+ capacity : Int = Channel .OPTIONAL_CHANNEL
96
+ ) : ChannelFlow<T>(context, capacity) {
97
+ private val consumed = atomic(false )
98
+
99
+ private fun markConsumed () =
100
+ check(! consumed.getAndSet(true )) { " ReceiveChannel.consumeAsFlow can be collected just once" }
101
+
102
+ override fun create (context : CoroutineContext , capacity : Int ): ChannelFlow <T > =
103
+ ConsumeAsFlow (channel, context, capacity)
104
+
105
+ override suspend fun collectTo (scope : ProducerScope <T >) =
106
+ SendingCollector (scope).emitAll(channel) // use efficient channel receiving code from emitAll
107
+
108
+ override fun broadcastImpl (scope : CoroutineScope , start : CoroutineStart ): BroadcastChannel <T > {
109
+ markConsumed() // fail fast on repeated attempt to collect it
110
+ return super .broadcastImpl(scope, start)
111
+ }
112
+
113
+ override fun produceImpl (scope : CoroutineScope ): ReceiveChannel <T > {
114
+ markConsumed() // fail fast on repeated attempt to collect it
115
+ return if (capacity == Channel .OPTIONAL_CHANNEL ) {
116
+ channel // direct
117
+ } else
118
+ super .produceImpl(scope) // extra buffering channel
119
+ }
79
120
80
121
override suspend fun collect (collector : FlowCollector <T >) {
81
- check(! collected.getAndSet(true )) { " ReceiveChannel.consumeAsFlow can be collected just once" }
82
- collector.emitAll(this @consumeAsFlow)
122
+ if (capacity == Channel .OPTIONAL_CHANNEL ) {
123
+ markConsumed()
124
+ collector.emitAll(channel) // direct
125
+ } else {
126
+ super .collect(collector) // extra buffering channel, produceImpl will mark it as consumed
127
+ }
83
128
}
129
+
130
+ override fun additionalToStringProps (): String = " channel=$channel , "
84
131
}
85
132
86
133
/* *
0 commit comments