Skip to content

Commit b08d61c

Browse files
authored
New flow builder: channelFlow (and its alias callbackFlow) and supple… (Kotlin#1214)
New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method Rationale: * Can be used in different context without breaking context preservation * Can be used to build concurrent operators such as merge * Can be used to integrate with callbacks * Is less error-prone than flowViaChannel because requires explicit await() call Partially fixes Kotlin#1210
1 parent f939617 commit b08d61c

File tree

9 files changed

+447
-231
lines changed

9 files changed

+447
-231
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+6
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
718718
}
719719

720720
public final class kotlinx/coroutines/channels/ProduceKt {
721+
public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
722+
public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
721723
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
722724
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
723725
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
@@ -796,6 +798,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
796798
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
797799
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
798800
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
801+
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
802+
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
803+
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
804+
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
799805
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
800806
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
801807
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/channels/Produce.kt

+30
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,36 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
2525
val channel: SendChannel<E>
2626
}
2727

28+
/**
29+
* Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
30+
* and invokes the given [block] before resuming the coroutine.
31+
*
32+
* Note that when producer channel is cancelled this function resumes with cancellation exception,
33+
* so putting the code after calling this function would not lead to its execution in case of cancellation.
34+
* That is why this code takes a lambda parameter.
35+
*
36+
* Example of usage:
37+
* ```
38+
* val callbackEventsStream = produce {
39+
* val disposable = registerChannelInCallback(channel)
40+
* awaitClose { disposable.dispose() }
41+
* }
42+
* ```
43+
*/
44+
@ExperimentalCoroutinesApi
45+
public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
46+
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
47+
try {
48+
suspendCancellableCoroutine<Unit> { cont ->
49+
invokeOnClose {
50+
cont.resume(Unit)
51+
}
52+
}
53+
} finally {
54+
block()
55+
}
56+
}
57+
2858
/**
2959
* Launches new coroutine to produce a stream of values by sending them to a channel
3060
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting

kotlinx-coroutines-core/common/src/flow/Builders.kt

+93-30
Original file line numberDiff line numberDiff line change
@@ -200,38 +200,14 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
200200
}
201201

202202
/**
203-
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
204-
* that is provided to the builder's [block] of code. It allows elements to be
205-
* produced by the code that is running in a different context, e.g. from a callback-based API.
206-
*
207-
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
208-
* on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
209-
* should be used instead.
210-
*
211-
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
212-
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
213-
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
214-
*
215-
* Example of usage:
216-
* ```
217-
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
218-
* val callback = object : Callback { // implementation of some callback interface
219-
* override fun onNextValue(value: T) {
220-
* channel.offer(value) // Note: offer drops value when buffer is full
221-
* }
222-
* override fun onApiError(cause: Throwable) {
223-
* channel.cancel("API Error", CancellationException(cause))
224-
* }
225-
* override fun onCompleted() = channel.close()
226-
* }
227-
* api.register(callback)
228-
* channel.invokeOnClose {
229-
* api.unregister(callback)
230-
* }
231-
* }
232-
* ```
203+
* @suppress
233204
*/
234205
@FlowPreview
206+
@Deprecated(
207+
message = "Use channelFlow instead",
208+
level = DeprecationLevel.WARNING,
209+
replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
210+
)
235211
public fun <T> flowViaChannel(
236212
bufferSize: Int = 16,
237213
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -249,3 +225,90 @@ public fun <T> flowViaChannel(
249225
}
250226
}
251227
}
228+
229+
/**
230+
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
231+
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
232+
* produced by the code that is running in a different context or running concurrently.
233+
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
234+
* on the resulting flow.
235+
*
236+
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
237+
* The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
238+
* For more detailed example please refer to [callbackFlow] documentation.
239+
*
240+
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
241+
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
242+
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
243+
*
244+
* Examples of usage:
245+
* ```
246+
* fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
247+
* launch {
248+
* collect { value -> send(value) }
249+
* }
250+
* other.collect { value -> send(value) }
251+
* }
252+
*
253+
* fun <T> contextualFlow(): Flow<T> = channelFlow {
254+
* launch(Dispatchers.IO) {
255+
* send(computeIoValue())
256+
* }
257+
*
258+
* launch(Dispatchers.Default) {
259+
* send(computeCpuValue())
260+
* }
261+
* }
262+
* ```
263+
*/
264+
@FlowPreview
265+
public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
266+
flow {
267+
coroutineScope {
268+
val channel = produce(capacity = bufferSize, block = block)
269+
channel.consumeEach { value ->
270+
emit(value)
271+
}
272+
}
273+
}
274+
275+
/**
276+
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
277+
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
278+
* produced by the code that is running in a different context or running concurrently.
279+
*
280+
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
281+
* on the resulting flow.
282+
*
283+
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
284+
* e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
285+
* callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
286+
* [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
287+
* or when callback-based API invokes [SendChannel.close] manually.
288+
*
289+
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
290+
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
291+
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
292+
*
293+
* Example of usage:
294+
* ```
295+
* fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
296+
* val callback = object : Callback { // implementation of some callback interface
297+
* override fun onNextValue(value: T) {
298+
* // Note: offer drops value when buffer is full
299+
* // Channel.UNLIMITED can be used to avoid overfill
300+
* offer(value)
301+
* }
302+
* override fun onApiError(cause: Throwable) {
303+
* cancel(CancellationException("API Error", cause))
304+
* }
305+
* override fun onCompleted() = channel.close()
306+
* }
307+
* api.register(callback)
308+
* // Suspend until either onCompleted or external cancellation are invoked
309+
* await { api.unregister(callback) }
310+
* }
311+
* ```
312+
*/
313+
public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
314+
channelFlow(bufferSize) { block() }

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+54
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,60 @@ class ProduceTest : TestBase() {
9494
cancelOnCompletion(coroutineContext)
9595
}
9696

97+
@Test
98+
fun testAwaitConsumerCancellation() = runTest {
99+
val parent = Job()
100+
val channel = produce<Int>(parent) {
101+
expect(2)
102+
awaitClose { expect(4) }
103+
}
104+
expect(1)
105+
yield()
106+
expect(3)
107+
channel.cancel()
108+
parent.complete()
109+
parent.join()
110+
finish(5)
111+
}
112+
113+
@Test
114+
fun testAwaitProducerCancellation() = runTest {
115+
val parent = Job()
116+
produce<Int>(parent) {
117+
expect(2)
118+
launch {
119+
expect(3)
120+
this@produce.cancel()
121+
}
122+
awaitClose { expect(4) }
123+
}
124+
expect(1)
125+
parent.complete()
126+
parent.join()
127+
finish(5)
128+
}
129+
130+
@Test
131+
fun testAwaitParentCancellation() = runTest {
132+
val parent = Job()
133+
produce<Int>(parent) {
134+
expect(2)
135+
awaitClose { expect(4) }
136+
}
137+
expect(1)
138+
yield()
139+
expect(3)
140+
parent.cancelAndJoin()
141+
finish(5)
142+
}
143+
144+
@Test
145+
fun testAwaitIllegalState() = runTest {
146+
val channel = produce<Int> { }
147+
@Suppress("RemoveExplicitTypeArguments") // KT-31525
148+
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).awaitClose<Nothing>() }
149+
}
150+
97151
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
98152
val source = Channel<Int>()
99153
expect(1)

0 commit comments

Comments
 (0)