Skip to content

Commit b3e70a1

Browse files
committed
New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method
Rationale: * Can be used in different context without breaking context preservation * Can be used to build concurrent operators such as merge * Can be used to integrate with callbacks * Is less error-prone than flowViaChannel because requires explicit await() call Partially fixes #1210
1 parent f939617 commit b3e70a1

File tree

9 files changed

+440
-229
lines changed

9 files changed

+440
-229
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+7
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
718718
}
719719

720720
public final class kotlinx/coroutines/channels/ProduceKt {
721+
public static final fun await (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
722+
public static synthetic fun await$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
721723
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
722724
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
723725
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
@@ -796,6 +798,11 @@ public final class kotlinx/coroutines/flow/FlowKt {
796798
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
797799
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
798800
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
801+
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
802+
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
803+
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
804+
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
805+
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
799806
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
800807
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
801808
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;

kotlinx-coroutines-core/common/src/channels/Produce.kt

+26
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,32 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
2525
val channel: SendChannel<E>
2626
}
2727

28+
/**
29+
* Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
30+
* and invokes the given [block] before resuming the coroutine.
31+
*
32+
* Example of usage:
33+
* ```
34+
* val callbackEventsStream = produce {
35+
* val disposable = registerChannelInCallback(channel)
36+
* await { disposable.dispose() }
37+
* }
38+
* ```
39+
*/
40+
@ExperimentalCoroutinesApi
41+
public suspend fun <T> ProducerScope<T>.await(block: () -> Unit = {}) {
42+
check(kotlin.coroutines.coroutineContext[Job] === this) { "await() can be invoke only from the producer context" }
43+
suspendCancellableCoroutine<Unit> { cont ->
44+
invokeOnClose {
45+
try {
46+
block()
47+
} finally {
48+
cont.resume(Unit)
49+
}
50+
}
51+
}
52+
}
53+
2854
/**
2955
* Launches new coroutine to produce a stream of values by sending them to a channel
3056
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting

kotlinx-coroutines-core/common/src/flow/Builders.kt

+91-30
Original file line numberDiff line numberDiff line change
@@ -200,38 +200,14 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
200200
}
201201

202202
/**
203-
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
204-
* that is provided to the builder's [block] of code. It allows elements to be
205-
* produced by the code that is running in a different context, e.g. from a callback-based API.
206-
*
207-
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
208-
* on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
209-
* should be used instead.
210-
*
211-
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
212-
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
213-
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
214-
*
215-
* Example of usage:
216-
* ```
217-
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
218-
* val callback = object : Callback { // implementation of some callback interface
219-
* override fun onNextValue(value: T) {
220-
* channel.offer(value) // Note: offer drops value when buffer is full
221-
* }
222-
* override fun onApiError(cause: Throwable) {
223-
* channel.cancel("API Error", CancellationException(cause))
224-
* }
225-
* override fun onCompleted() = channel.close()
226-
* }
227-
* api.register(callback)
228-
* channel.invokeOnClose {
229-
* api.unregister(callback)
230-
* }
231-
* }
232-
* ```
203+
* @suppress
233204
*/
234205
@FlowPreview
206+
@Deprecated(
207+
message = "Use channelFlow instead",
208+
level = DeprecationLevel.WARNING,
209+
replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
210+
)
235211
public fun <T> flowViaChannel(
236212
bufferSize: Int = 16,
237213
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -249,3 +225,88 @@ public fun <T> flowViaChannel(
249225
}
250226
}
251227
}
228+
229+
/**
230+
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
231+
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
232+
* produced by the code that is running in a different context or running concurrently.
233+
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
234+
* on the resulting flow.
235+
*
236+
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
237+
* The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [await] can be used.
238+
* For more detailed example please refer to [callbackFlow] documentation.
239+
*
240+
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
241+
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
242+
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
243+
*
244+
* Examples of usage:
245+
* ```
246+
* fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
247+
* launch {
248+
* collect { value -> send(value) }
249+
* }
250+
* other.collect { value -> send(value) }
251+
* }
252+
*
253+
* fun <T> contextualFlow(): Flow<T> = channelFlow {
254+
* launch(Dispatchers.IO) {
255+
* send(computeIoValue())
256+
* }
257+
*
258+
* launch(Dispatchers.Default) {
259+
* send(computeCpuValue())
260+
* }
261+
* }
262+
* ```
263+
*/
264+
@FlowPreview
265+
public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
266+
flow {
267+
coroutineScope {
268+
val channel = produce(capacity = bufferSize, block = block)
269+
channel.consumeEach { value ->
270+
emit(value)
271+
}
272+
}
273+
}
274+
275+
/**
276+
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
277+
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
278+
* produced by the code that is running in a different context or running concurrently.
279+
*
280+
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
281+
* on the resulting flow.
282+
*
283+
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
284+
* e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
285+
* callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
286+
* [await] extension should be used. [await] argument will be invoked when either flow consumer cancels flow collection
287+
* or when callback-based API invokes [SendChannel.close] manually.
288+
*
289+
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
290+
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
291+
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
292+
*
293+
* Example of usage:
294+
* ```
295+
* fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
296+
* val callback = object : Callback { // implementation of some callback interface
297+
* override fun onNextValue(value: T) {
298+
* offer(value) // Note: offer drops value when buffer is full
299+
* }
300+
* override fun onApiError(cause: Throwable) {
301+
* cancel("API Error", CancellationException(cause))
302+
* }
303+
* override fun onCompleted() = channel.close()
304+
* }
305+
* api.register(callback)
306+
* // Suspend until either onCompleted or external cancellation are invoked
307+
* await { api.unregister(callback) }
308+
* }
309+
* ```
310+
*/
311+
public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
312+
channelFlow(bufferSize) { block() }

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+54
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,60 @@ class ProduceTest : TestBase() {
9494
cancelOnCompletion(coroutineContext)
9595
}
9696

97+
@Test
98+
fun testAwaitConsumerCancellation() = runTest {
99+
val parent = Job()
100+
val channel = produce<Int>(parent) {
101+
expect(2)
102+
await { expect(4) }
103+
}
104+
expect(1)
105+
yield()
106+
expect(3)
107+
channel.cancel()
108+
parent.complete()
109+
parent.join()
110+
finish(5)
111+
}
112+
113+
@Test
114+
fun testAwaitProducerCancellation() = runTest {
115+
val parent = Job()
116+
produce<Int>(parent) {
117+
expect(2)
118+
launch {
119+
expect(3)
120+
this@produce.cancel()
121+
}
122+
await { expect(4) }
123+
}
124+
expect(1)
125+
parent.complete()
126+
parent.join()
127+
finish(5)
128+
}
129+
130+
@Test
131+
fun testAwaitParentCancellation() = runTest {
132+
val parent = Job()
133+
produce<Int>(parent) {
134+
expect(2)
135+
await { expect(4) }
136+
}
137+
expect(1)
138+
yield()
139+
expect(3)
140+
parent.cancelAndJoin()
141+
finish(5)
142+
}
143+
144+
@Test
145+
fun testAwaitIllegalState() = runTest {
146+
val channel = produce<Int> { }
147+
@Suppress("RemoveExplicitTypeArguments") // KT-31525
148+
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).await<Nothing>() }
149+
}
150+
97151
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
98152
val source = Channel<Int>()
99153
expect(1)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class ChannelBuildersFlowTest : TestBase() {
12+
@Test
13+
fun testBroadcastChannelAsFlow() = runTest {
14+
val channel = broadcast {
15+
repeat(10) {
16+
send(it + 1)
17+
}
18+
}
19+
20+
val sum = channel.asFlow().sum()
21+
assertEquals(55, sum)
22+
}
23+
24+
@Test
25+
fun testExceptionInBroadcast() = runTest {
26+
expect(1)
27+
val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
28+
repeat(10) {
29+
send(it + 1)
30+
}
31+
throw TestException()
32+
}
33+
assertEquals(15, channel.asFlow().take(5).sum())
34+
35+
// Workaround for JS bug
36+
try {
37+
channel.asFlow().collect { /* Do nothing */ }
38+
expectUnreached()
39+
} catch (e: TestException) {
40+
finish(2)
41+
}
42+
}
43+
44+
@Test
45+
fun testBroadcastChannelAsFlowLimits() = runTest {
46+
val channel = BroadcastChannel<Int>(1)
47+
val flow = channel.asFlow().map { it * it }.drop(1).take(2)
48+
49+
var expected = 0
50+
launch {
51+
assertTrue(channel.offer(1)) // Handed to the coroutine
52+
assertTrue(channel.offer(2)) // Buffered
53+
assertFalse(channel.offer(3)) // Failed to offer
54+
channel.send(3)
55+
yield()
56+
assertEquals(1, expected)
57+
assertTrue(channel.offer(4)) // Handed to the coroutine
58+
assertTrue(channel.offer(5)) // Buffered
59+
assertFalse(channel.offer(6)) // Failed to offer
60+
channel.send(6)
61+
assertEquals(2, expected)
62+
}
63+
64+
val sum = flow.sum()
65+
assertEquals(13, sum)
66+
++expected
67+
val sum2 = flow.sum()
68+
assertEquals(61, sum2)
69+
++expected
70+
}
71+
72+
@Test
73+
fun flowAsBroadcast() = runTest {
74+
val flow = flow {
75+
repeat(10) {
76+
emit(it)
77+
}
78+
}
79+
80+
val channel = flow.broadcastIn(this)
81+
assertEquals((0..9).toList(), channel.openSubscription().toList())
82+
}
83+
84+
@Test
85+
fun flowAsBroadcastMultipleSubscription() = runTest {
86+
val flow = flow {
87+
repeat(10) {
88+
emit(it)
89+
}
90+
}
91+
92+
val broadcast = flow.broadcastIn(this)
93+
val channel = broadcast.openSubscription()
94+
val channel2 = broadcast.openSubscription()
95+
96+
assertEquals(0, channel.receive())
97+
assertEquals(0, channel2.receive())
98+
yield()
99+
assertEquals(1, channel.receive())
100+
assertEquals(1, channel2.receive())
101+
102+
channel.cancel()
103+
channel2.cancel()
104+
yield()
105+
ensureActive()
106+
}
107+
108+
@Test
109+
fun flowAsBroadcastException() = runTest {
110+
val flow = flow {
111+
repeat(10) {
112+
emit(it)
113+
}
114+
115+
throw TestException()
116+
}
117+
118+
val channel = flow.broadcastIn(this + NonCancellable)
119+
assertFailsWith<TestException> { channel.openSubscription().toList() }
120+
assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
121+
}
122+
123+
// Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
124+
@Test
125+
fun testFlowAsBroadcastAsFlow() = runTest {
126+
val flow = flow {
127+
emit(1)
128+
emit(2)
129+
emit(3)
130+
}.broadcastIn(this).asFlow()
131+
132+
assertEquals(6, flow.sum())
133+
assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
134+
}
135+
136+
@Test
137+
fun testBroadcastAsFlowAsBroadcast() = runTest {
138+
val channel = broadcast {
139+
send(1)
140+
}.asFlow().broadcastIn(this)
141+
142+
channel.openSubscription().consumeEach {
143+
assertEquals(1, it)
144+
}
145+
146+
channel.openSubscription().consumeEach {
147+
fail()
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)