Skip to content

Commit 7872f8f

Browse files
authored
Mark BroadcastChannel, ConflatedBroadcastChannel and all related oper… (#2647)
* Mark BroadcastChannel, ConflatedBroadcastChannel and all related operators as obsolete API replaced with SharedFlow and StateFlow * Remove operator fusion with deprecated broadcastIn in order to simplify further Flow maintenance
1 parent 3c83c0c commit 7872f8f

File tree

12 files changed

+56
-199
lines changed

12 files changed

+56
-199
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

-1
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,6 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
11021102
public final field onBufferOverflow Lkotlinx/coroutines/channels/BufferOverflow;
11031103
public fun <init> (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)V
11041104
protected fun additionalToStringProps ()Ljava/lang/String;
1105-
public fun broadcastImpl (Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
11061105
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
11071106
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
11081107
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;ILkotlinx/coroutines/channels/BufferOverflow;)Lkotlinx/coroutines/flow/internal/ChannelFlow;

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+9-4
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ import kotlin.coroutines.intrinsics.*
3535
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
3636
* the broadcasting coroutine in hard-to-specify ways.
3737
*
38-
* **Note: This API is obsolete.** It will be deprecated and replaced with the
39-
* [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator when it becomes stable.
38+
* **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
39+
* and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn]
40+
* operator.
4041
*
4142
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
4243
*/
44+
@ObsoleteCoroutinesApi
4345
public fun <E> ReceiveChannel<E>.broadcast(
4446
capacity: Int = 1,
4547
start: CoroutineStart = CoroutineStart.LAZY
@@ -95,17 +97,20 @@ public fun <E> ReceiveChannel<E>.broadcast(
9597
*
9698
* ### Future replacement
9799
*
100+
* This API is obsolete since 1.5.0.
98101
* This function has an inappropriate result type of [BroadcastChannel] which provides
99102
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
100-
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
101-
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
103+
* the broadcasting coroutine in hard-to-specify ways. It will be deprecated with warning in 1.6.0
104+
* and with error in 1.7.0. It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn]
105+
* operator.
102106
*
103107
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
104108
* @param capacity capacity of the channel's buffer (1 by default).
105109
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
106110
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
107111
* @param block the coroutine code.
108112
*/
113+
@ObsoleteCoroutinesApi
109114
public fun <E> CoroutineScope.broadcast(
110115
context: CoroutineContext = EmptyCoroutineContext,
111116
capacity: Int = 1,

kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
2020
* See `BroadcastChannel()` factory function for the description of available
2121
* broadcast channel implementations.
2222
*
23-
* **Note: This API is obsolete.** It will be deprecated and replaced by [SharedFlow][kotlinx.coroutines.flow.SharedFlow]
24-
* when it becomes stable.
23+
* **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
24+
* and with error in 1.7.0. It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
2525
*/
26-
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
26+
@ObsoleteCoroutinesApi
2727
public interface BroadcastChannel<E> : SendChannel<E> {
2828
/**
2929
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
@@ -60,9 +60,11 @@ public interface BroadcastChannel<E> : SendChannel<E> {
6060
* * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
6161
* * otherwise -- throws [IllegalArgumentException].
6262
*
63-
* **Note: This is an experimental api.** It may be changed in the future updates.
63+
* **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
64+
* and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow]
65+
* and [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
6466
*/
65-
@ExperimentalCoroutinesApi
67+
@ObsoleteCoroutinesApi
6668
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
6769
when (capacity) {
6870
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")

kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ import kotlin.jvm.*
2626
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
2727
* number of subscribers.
2828
*
29-
* **Note: This API is obsolete.** It will be deprecated and replaced by [StateFlow][kotlinx.coroutines.flow.StateFlow]
30-
* when it becomes stable.
29+
* **Note: This API is obsolete since 1.5.0.** It will be deprecated with warning in 1.6.0
30+
* and with error in 1.7.0. It is replaced with [StateFlow][kotlinx.coroutines.flow.StateFlow].
3131
*/
32-
@ExperimentalCoroutinesApi // not @ObsoleteCoroutinesApi to reduce burden for people who are still using it
32+
@ObsoleteCoroutinesApi
3333
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
3434
/**
3535
* Creates an instance of this class that already holds a value.

kotlinx-coroutines-core/common/src/flow/Builders.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public fun <T> flowViaChannel(
234234
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
235235
* than consumed, i.e. to control the back-pressure behavior.
236236
*
237-
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
237+
* Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
238238
* always fused so that only one properly configured channel is used for execution.
239239
*
240240
* Examples of usage:
@@ -289,7 +289,7 @@ public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.()
289289
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
290290
* than consumed, i.e. to control the back-pressure behavior.
291291
*
292-
* Adjacent applications of [callbackFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
292+
* Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are
293293
* always fused so that only one properly configured channel is used for execution.
294294
*
295295
* Example of usage that converts a multi-shot callback API to a flow.

kotlinx-coroutines-core/common/src/flow/Channels.kt

+23-21
Original file line numberDiff line numberDiff line change
@@ -133,17 +133,12 @@ private class ChannelAsFlow<T>(
133133
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
134134
ChannelAsFlow(channel, consume, context, capacity, onBufferOverflow)
135135

136-
override fun dropChannelOperators(): Flow<T>? =
136+
override fun dropChannelOperators(): Flow<T> =
137137
ChannelAsFlow(channel, consume)
138138

139139
override suspend fun collectTo(scope: ProducerScope<T>) =
140140
SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll
141141

142-
override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
143-
markConsumed() // fail fast on repeated attempt to collect it
144-
return super.broadcastImpl(scope, start)
145-
}
146-
147142
override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
148143
markConsumed() // fail fast on repeated attempt to collect it
149144
return if (capacity == Channel.OPTIONAL_CHANNEL) {
@@ -173,22 +168,16 @@ private class ChannelAsFlow<T>(
173168
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
174169
* 3) If the flow consumer fails with an exception, subscription is cancelled.
175170
*/
176-
@FlowPreview
171+
@Deprecated(
172+
level = DeprecationLevel.WARNING,
173+
message = "'BroadcastChannel' is obsolete and all coreresponding operators are deprecated " +
174+
"in the favour of StateFlow and SharedFlow"
175+
) // Since 1.5.0, was @FlowPreview, safe to remove in 1.7.0
177176
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
178177
emitAll(openSubscription())
179178
}
180179

181180
/**
182-
* Creates a [broadcast] coroutine that collects the given flow.
183-
*
184-
* This transformation is **stateful**, it launches a [broadcast] coroutine
185-
* that collects the given flow and thus resulting channel should be properly closed or cancelled.
186-
*
187-
* A channel with [default][Channel.Factory.BUFFERED] buffer size is created.
188-
* Use [buffer] operator on the flow before calling `broadcastIn` to specify a value other than
189-
* default and to control what happens when data is produced faster than it is consumed,
190-
* that is to control backpressure behavior.
191-
*
192181
* ### Deprecated
193182
*
194183
* **This API is deprecated.** The [BroadcastChannel] provides a complex channel-like API for hot flows.
@@ -202,13 +191,26 @@ public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
202191
@Deprecated(
203192
message = "Use shareIn operator and the resulting SharedFlow as a replacement for BroadcastChannel",
204193
replaceWith = ReplaceWith("this.shareIn(scope, SharingStarted.Lazily, 0)"),
205-
level = DeprecationLevel.WARNING
206-
)
194+
level = DeprecationLevel.ERROR
195+
) // WARNING in 1.4.0, error in 1.5.0, removed in 1.6.0 (was @FlowPreview)
207196
public fun <T> Flow<T>.broadcastIn(
208197
scope: CoroutineScope,
209198
start: CoroutineStart = CoroutineStart.LAZY
210-
): BroadcastChannel<T> =
211-
asChannelFlow().broadcastImpl(scope, start)
199+
): BroadcastChannel<T> {
200+
// Backwards compatibility with operator fusing
201+
val channelFlow = asChannelFlow()
202+
val capacity = when (channelFlow.onBufferOverflow) {
203+
BufferOverflow.SUSPEND -> channelFlow.produceCapacity
204+
BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
205+
BufferOverflow.DROP_LATEST ->
206+
throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
207+
}
208+
return scope.broadcast(channelFlow.context, capacity = capacity, start = start) {
209+
collect { value ->
210+
send(value)
211+
}
212+
}
213+
}
212214

213215
/**
214216
* Creates a [produce] coroutine that collects the given flow.

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ import kotlin.native.concurrent.*
8080
* ### SharedFlow vs BroadcastChannel
8181
*
8282
* Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel]
83-
* and is designed to completely replace `BroadcastChannel` in the future.
83+
* and is designed to completely replace it.
8484
* It has the following important differences:
8585
*
8686
* * `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ import kotlin.native.concurrent.*
8888
* ### StateFlow vs ConflatedBroadcastChannel
8989
*
9090
* Conceptually, state flow is similar to [ConflatedBroadcastChannel]
91-
* and is designed to completely replace `ConflatedBroadcastChannel` in the future.
91+
* and is designed to completely replace it.
9292
* It has the following important differences:
9393
*
9494
* * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows

kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt

+3-15
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public interface FusibleFlow<T> : Flow<T> {
3737
/**
3838
* Operators that use channels as their "output" extend this `ChannelFlow` and are always fused with each other.
3939
* This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting
40-
* methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it
40+
* methods like ability to [produceIn] the corresponding flow, thus making it
4141
* possible to directly use the backing channel if it exists (hence the `ChannelFlow` name).
4242
*
4343
* @suppress **This an internal API and should not be used from general code.**
@@ -59,7 +59,7 @@ public abstract class ChannelFlow<T>(
5959
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
6060
get() = { collectTo(it) }
6161

62-
private val produceCapacity: Int
62+
internal val produceCapacity: Int
6363
get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity
6464

6565
/**
@@ -107,18 +107,6 @@ public abstract class ChannelFlow<T>(
107107

108108
protected abstract suspend fun collectTo(scope: ProducerScope<T>)
109109

110-
// broadcastImpl is used in broadcastIn operator which is obsolete and replaced by SharedFlow.
111-
// BroadcastChannel does not support onBufferOverflow beyond simple conflation
112-
public open fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
113-
val broadcastCapacity = when (onBufferOverflow) {
114-
BufferOverflow.SUSPEND -> produceCapacity
115-
BufferOverflow.DROP_OLDEST -> Channel.CONFLATED
116-
BufferOverflow.DROP_LATEST ->
117-
throw IllegalArgumentException("Broadcast channel does not support BufferOverflow.DROP_LATEST")
118-
}
119-
return scope.broadcast(context, broadcastCapacity, start, block = collectToFun)
120-
}
121-
122110
/**
123111
* Here we use ATOMIC start for a reason (#1825).
124112
* NB: [produceImpl] is used for [flowOn].
@@ -201,7 +189,7 @@ internal class ChannelFlowOperatorImpl<T>(
201189
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
202190
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
203191

204-
override fun dropChannelOperators(): Flow<T>? = flow
192+
override fun dropChannelOperators(): Flow<T> = flow
205193

206194
override suspend fun flowCollect(collector: FlowCollector<T>) =
207195
flow.collect(collector)

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ import kotlin.jvm.*
7979
*
8080
* ### Operator fusion
8181
*
82-
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
82+
* Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
8383
* always fused so that only one properly configured channel is used for execution.
8484
*
8585
* Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
@@ -176,7 +176,7 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capaci
176176
*
177177
* ### Operator fusion
178178
*
179-
* Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are
179+
* Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are
180180
* always fused so that only one properly configured channel is used for execution.
181181
* **Conflation takes precedence over `buffer()` calls with any other capacity.**
182182
*
@@ -219,7 +219,7 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
219219
*
220220
* ### Operator fusion
221221
*
222-
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
222+
* Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
223223
* always fused so that only one properly configured channel is used for execution.
224224
*
225225
* Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>
5757
*
5858
* ### Operator fusion
5959
*
60-
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
60+
* Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
6161
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
6262
*
6363
* @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
@@ -87,7 +87,7 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
8787
*
8888
* ### Operator fusion
8989
*
90-
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
90+
* Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
9191
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
9292
*/
9393
@ExperimentalCoroutinesApi
@@ -111,7 +111,7 @@ public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
111111
*
112112
* ### Operator fusion
113113
*
114-
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
114+
* Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
115115
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
116116
*/
117117
@ExperimentalCoroutinesApi
@@ -126,7 +126,7 @@ public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge(
126126
*
127127
* ### Operator fusion
128128
*
129-
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
129+
* Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
130130
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
131131
*
132132
* When [concurrency] is greater than 1, this operator is [buffered][buffer] by default

0 commit comments

Comments
 (0)