Skip to content

Commit 01d8e6f

Browse files
committed
New flow builder: channelFlow (and its alias callbackFlow) and supplementary ProducerScope.await method
Rationale: * Can be used in different context without breaking context preservation * Can be used to build concurrent operators such as merge * Can be used to integrate with callbacks * Is less error-prone than flowViaChannel because requires explicit await() call Partially fixes #1210
1 parent 3fe7bd2 commit 01d8e6f

File tree

9 files changed

+439
-229
lines changed

9 files changed

+439
-229
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+6
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
719719
}
720720

721721
public final class kotlinx/coroutines/channels/ProduceKt {
722+
public static final fun await (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
723+
public static synthetic fun await$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
722724
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
723725
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
724726
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
@@ -797,6 +799,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
797799
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
798800
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
799801
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
802+
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
803+
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
804+
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
805+
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
800806
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
801807
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
802808
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;

kotlinx-coroutines-core/common/src/channels/Produce.kt

+26
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,32 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
2525
val channel: SendChannel<E>
2626
}
2727

28+
/**
29+
* Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
30+
* and invokes the given [block] before resuming the coroutine.
31+
*
32+
* Example of usage:
33+
* ```
34+
* val callbackEventsStream = produce {
35+
* val disposable = registerChannelInCallback(channel)
36+
* await { disposable.dispose() }
37+
* }
38+
* ```
39+
*/
40+
@ExperimentalCoroutinesApi
41+
public suspend fun <T> ProducerScope<T>.await(block: () -> Unit = {}) {
42+
check(kotlin.coroutines.coroutineContext[Job] === this) { "await() can be invoke only from the producer context" }
43+
suspendCancellableCoroutine<Unit> { cont ->
44+
invokeOnClose {
45+
try {
46+
block()
47+
} finally {
48+
cont.resume(Unit)
49+
}
50+
}
51+
}
52+
}
53+
2854
/**
2955
* Launches new coroutine to produce a stream of values by sending them to a channel
3056
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting

kotlinx-coroutines-core/common/src/flow/Builders.kt

+91-30
Original file line numberDiff line numberDiff line change
@@ -188,38 +188,14 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
188188
}
189189

190190
/**
191-
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
192-
* that is provided to the builder's [block] of code. It allows elements to be
193-
* produced by the code that is running in a different context, e.g. from a callback-based API.
194-
*
195-
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
196-
* on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
197-
* should be used instead.
198-
*
199-
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
200-
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
201-
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
202-
*
203-
* Example of usage:
204-
* ```
205-
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
206-
* val callback = object : Callback { // implementation of some callback interface
207-
* override fun onNextValue(value: T) {
208-
* channel.offer(value) // Note: offer drops value when buffer is full
209-
* }
210-
* override fun onApiError(cause: Throwable) {
211-
* channel.cancel("API Error", CancellationException(cause))
212-
* }
213-
* override fun onCompleted() = channel.close()
214-
* }
215-
* api.register(callback)
216-
* channel.invokeOnClose {
217-
* api.unregister(callback)
218-
* }
219-
* }
220-
* ```
191+
* @suppress
221192
*/
222193
@FlowPreview
194+
@Deprecated(
195+
message = "Use channelFlow instead",
196+
level = DeprecationLevel.WARNING,
197+
replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
198+
)
223199
public fun <T> flowViaChannel(
224200
bufferSize: Int = 16,
225201
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
@@ -237,3 +213,88 @@ public fun <T> flowViaChannel(
237213
}
238214
}
239215
}
216+
217+
/**
218+
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
219+
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
220+
* produced by the code that is running in a different context or running concurrently.
221+
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
222+
* on the resulting flow.
223+
*
224+
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
225+
* The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [await] can be used.
226+
* For more detailed example please refer to [callbackFlow] documentation.
227+
*
228+
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
229+
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
230+
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
231+
*
232+
* Examples of usage:
233+
* ```
234+
* fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
235+
* launch {
236+
* collect { value -> send(value) }
237+
* }
238+
* other.collect { value -> send(value) }
239+
* }
240+
*
241+
* fun <T> contextualFlow(): Flow<T> = channelFlow {
242+
* launch(Dispatchers.IO) {
243+
* send(computeIoValue())
244+
* }
245+
*
246+
* launch(Dispatchers.Default) {
247+
* send(computeCpuValue())
248+
* }
249+
* }
250+
* ```
251+
*/
252+
@FlowPreview
253+
public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
254+
flow {
255+
coroutineScope {
256+
val channel = produce(capacity = bufferSize, block = block)
257+
channel.consumeEach { value ->
258+
emit(value)
259+
}
260+
}
261+
}
262+
263+
/**
264+
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
265+
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
266+
* produced by the code that is running in a different context or running concurrently.
267+
*
268+
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
269+
* on the resulting flow.
270+
*
271+
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
272+
* e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
273+
* callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
274+
* [await] extension should be used. [await] argument will be invoked when either flow consumer cancels flow collection
275+
* or when callback-based API invokes [SendChannel.close] manually.
276+
*
277+
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
278+
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
279+
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
280+
*
281+
* Example of usage:
282+
* ```
283+
* fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
284+
* val callback = object : Callback { // implementation of some callback interface
285+
* override fun onNextValue(value: T) {
286+
* offer(value) // Note: offer drops value when buffer is full
287+
* }
288+
* override fun onApiError(cause: Throwable) {
289+
* cancel("API Error", CancellationException(cause))
290+
* }
291+
* override fun onCompleted() = channel.close()
292+
* }
293+
* api.register(callback)
294+
* // Suspend until either onCompleted or external cancellation are invoked
295+
* await { api.unregister(callback) }
296+
* }
297+
* ```
298+
*/
299+
public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
300+
channelFlow(bufferSize) { block() }

kotlinx-coroutines-core/common/test/channels/ProduceTest.kt

+54
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,60 @@ class ProduceTest : TestBase() {
9494
cancelOnCompletion(coroutineContext)
9595
}
9696

97+
@Test
98+
fun testAwaitConsumerCancellation() = runTest {
99+
val parent = Job()
100+
val channel = produce<Int>(parent) {
101+
expect(2)
102+
await { expect(4) }
103+
}
104+
expect(1)
105+
yield()
106+
expect(3)
107+
channel.cancel()
108+
parent.complete()
109+
parent.join()
110+
finish(5)
111+
}
112+
113+
@Test
114+
fun testAwaitProducerCancellation() = runTest {
115+
val parent = Job()
116+
produce<Int>(parent) {
117+
expect(2)
118+
launch {
119+
expect(3)
120+
this@produce.cancel()
121+
}
122+
await { expect(4) }
123+
}
124+
expect(1)
125+
parent.complete()
126+
parent.join()
127+
finish(5)
128+
}
129+
130+
@Test
131+
fun testAwaitParentCancellation() = runTest {
132+
val parent = Job()
133+
produce<Int>(parent) {
134+
expect(2)
135+
await { expect(4) }
136+
}
137+
expect(1)
138+
yield()
139+
expect(3)
140+
parent.cancelAndJoin()
141+
finish(5)
142+
}
143+
144+
@Test
145+
fun testAwaitIllegalState() = runTest {
146+
val channel = produce<Int> { }
147+
@Suppress("RemoveExplicitTypeArguments") // KT-31525
148+
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).await<Nothing>() }
149+
}
150+
97151
private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
98152
val source = Channel<Int>()
99153
expect(1)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class ChannelBuildersFlowTest : TestBase() {
12+
@Test
13+
fun testBroadcastChannelAsFlow() = runTest {
14+
val channel = broadcast {
15+
repeat(10) {
16+
send(it + 1)
17+
}
18+
}
19+
20+
val sum = channel.asFlow().sum()
21+
assertEquals(55, sum)
22+
}
23+
24+
@Test
25+
fun testExceptionInBroadcast() = runTest {
26+
expect(1)
27+
val channel = broadcast(NonCancellable) { // otherwise failure will cancel scope as well
28+
repeat(10) {
29+
send(it + 1)
30+
}
31+
throw TestException()
32+
}
33+
assertEquals(15, channel.asFlow().take(5).sum())
34+
35+
// Workaround for JS bug
36+
try {
37+
channel.asFlow().collect { /* Do nothing */ }
38+
expectUnreached()
39+
} catch (e: TestException) {
40+
finish(2)
41+
}
42+
}
43+
44+
@Test
45+
fun testBroadcastChannelAsFlowLimits() = runTest {
46+
val channel = BroadcastChannel<Int>(1)
47+
val flow = channel.asFlow().map { it * it }.drop(1).take(2)
48+
49+
var expected = 0
50+
launch {
51+
assertTrue(channel.offer(1)) // Handed to the coroutine
52+
assertTrue(channel.offer(2)) // Buffered
53+
assertFalse(channel.offer(3)) // Failed to offer
54+
channel.send(3)
55+
yield()
56+
assertEquals(1, expected)
57+
assertTrue(channel.offer(4)) // Handed to the coroutine
58+
assertTrue(channel.offer(5)) // Buffered
59+
assertFalse(channel.offer(6)) // Failed to offer
60+
channel.send(6)
61+
assertEquals(2, expected)
62+
}
63+
64+
val sum = flow.sum()
65+
assertEquals(13, sum)
66+
++expected
67+
val sum2 = flow.sum()
68+
assertEquals(61, sum2)
69+
++expected
70+
}
71+
72+
@Test
73+
fun flowAsBroadcast() = runTest {
74+
val flow = flow {
75+
repeat(10) {
76+
emit(it)
77+
}
78+
}
79+
80+
val channel = flow.broadcastIn(this)
81+
assertEquals((0..9).toList(), channel.openSubscription().toList())
82+
}
83+
84+
@Test
85+
fun flowAsBroadcastMultipleSubscription() = runTest {
86+
val flow = flow {
87+
repeat(10) {
88+
emit(it)
89+
}
90+
}
91+
92+
val broadcast = flow.broadcastIn(this)
93+
val channel = broadcast.openSubscription()
94+
val channel2 = broadcast.openSubscription()
95+
96+
assertEquals(0, channel.receive())
97+
assertEquals(0, channel2.receive())
98+
yield()
99+
assertEquals(1, channel.receive())
100+
assertEquals(1, channel2.receive())
101+
102+
channel.cancel()
103+
channel2.cancel()
104+
yield()
105+
ensureActive()
106+
}
107+
108+
@Test
109+
fun flowAsBroadcastException() = runTest {
110+
val flow = flow {
111+
repeat(10) {
112+
emit(it)
113+
}
114+
115+
throw TestException()
116+
}
117+
118+
val channel = flow.broadcastIn(this + NonCancellable)
119+
assertFailsWith<TestException> { channel.openSubscription().toList() }
120+
assertTrue(channel.isClosedForSend) // Failure in the flow fails the channel
121+
}
122+
123+
// Semantics of these tests puzzle me, we should figure out the way to prohibit such chains
124+
@Test
125+
fun testFlowAsBroadcastAsFlow() = runTest {
126+
val flow = flow {
127+
emit(1)
128+
emit(2)
129+
emit(3)
130+
}.broadcastIn(this).asFlow()
131+
132+
assertEquals(6, flow.sum())
133+
assertEquals(0, flow.sum()) // Well suddenly flow is no longer idempotent and cold
134+
}
135+
136+
@Test
137+
fun testBroadcastAsFlowAsBroadcast() = runTest {
138+
val channel = broadcast {
139+
send(1)
140+
}.asFlow().broadcastIn(this)
141+
142+
channel.openSubscription().consumeEach {
143+
assertEquals(1, it)
144+
}
145+
146+
channel.openSubscription().consumeEach {
147+
fail()
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)