Skip to content

Commit b4ccb30

Browse files
committed
ReceiveChannel.broadcast shall start lazy coroutine when closed
* Documentation on broadcast operators is added that explains that the resulting BroadcastChannel shall be cancelled if it is not needed anymore. * More tests added for various broadcast cancel/close cases. * The only functional change is that closing a broadcast channel for lazy coroutine shall start the corresponding coroutine to give it a chance to promptly fail. * Mark broadcast operators as obsolete. To be replaced with sharing operators on flows (see #1716). Fixes #1713
1 parent a25bf36 commit b4ccb30

File tree

2 files changed

+155
-6
lines changed

2 files changed

+155
-6
lines changed

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+50-4
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,49 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlin.coroutines.*
1212
import kotlin.coroutines.intrinsics.*
13+
import kotlin.native.concurrent.*
1314

1415
/**
1516
* Broadcasts all elements of the channel.
17+
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1618
*
1719
* The kind of the resulting channel depends on the specified [capacity] parameter:
1820
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
1921
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
2022
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
2123
* otherwise -- throws [IllegalArgumentException].
2224
*
25+
* ### Cancelling broadcast
26+
*
27+
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
28+
*
29+
* Do not use [close][BroadcastChannel.close] on the resulting channel.
30+
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
31+
* but it is not as prompt.
32+
*
33+
* ### Obsolete
34+
*
35+
* This function has an inappropriate result type of [BroadcastChannel] which provides
36+
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
37+
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
38+
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow].
39+
*
2340
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
2441
*/
42+
@ObsoleteCoroutinesApi // since version 1.4.0
2543
fun <E> ReceiveChannel<E>.broadcast(
2644
capacity: Int = 1,
2745
start: CoroutineStart = CoroutineStart.LAZY
28-
): BroadcastChannel<E> =
29-
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
46+
): BroadcastChannel<E> {
47+
val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
48+
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
49+
// which passes all exceptions upstream to the source ReceiveChannel
50+
return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
3051
for (e in this@broadcast) {
3152
send(e)
3253
}
3354
}
55+
}
3456

3557
/**
3658
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
@@ -63,12 +85,28 @@ fun <E> ReceiveChannel<E>.broadcast(
6385
*
6486
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
6587
*
88+
* ### Cancelling broadcast
89+
*
90+
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
91+
*
92+
* Do not use [close][BroadcastChannel.close] on the resulting channel.
93+
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
94+
* coroutine is doing something else.
95+
*
96+
* ### Obsolete
97+
*
98+
* This function has an inappropriate result type of [BroadcastChannel] which provides
99+
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
100+
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
101+
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow].
102+
*
66103
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
67104
* @param capacity capacity of the channel's buffer (1 by default).
68105
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
69106
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
70107
* @param block the coroutine code.
71108
*/
109+
@ObsoleteCoroutinesApi // since version 1.4.0
72110
public fun <E> CoroutineScope.broadcast(
73111
context: CoroutineContext = EmptyCoroutineContext,
74112
capacity: Int = 1,
@@ -107,8 +145,9 @@ private open class BroadcastCoroutine<E>(
107145
}
108146

109147
override fun cancelInternal(cause: Throwable) {
110-
_channel.cancel(cause.toCancellationException()) // cancel the channel
111-
cancelCoroutine(cause) // cancel the job
148+
val exception = cause.toCancellationException()
149+
_channel.cancel(exception) // cancel the channel
150+
cancelCoroutine(exception) // cancel the job
112151
}
113152

114153
override fun onCompleted(value: Unit) {
@@ -119,6 +158,13 @@ private open class BroadcastCoroutine<E>(
119158
val processed = _channel.close(cause)
120159
if (!processed && !handled) handleCoroutineException(context, cause)
121160
}
161+
162+
// The BroadcastChannel could be also closed
163+
override fun close(cause: Throwable?): Boolean {
164+
val result = _channel.close(cause)
165+
start() // start coroutine if it was not started yet
166+
return result
167+
}
122168
}
123169

124170
private class LazyBroadcastCoroutine<E>(

kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt

+105-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED")
6+
57
package kotlinx.coroutines.channels
68

79
import kotlinx.coroutines.*
8-
import kotlin.coroutines.*
910
import kotlin.test.*
1011

1112
class BroadcastTest : TestBase() {
@@ -34,4 +35,106 @@ class BroadcastTest : TestBase() {
3435
yield() // to broadcast
3536
finish(11)
3637
}
38+
39+
/**
40+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
41+
*/
42+
@Test
43+
fun testChannelBroadcastLazyCancel() = runTest {
44+
expect(1)
45+
val a = produce {
46+
expect(3)
47+
assertFailsWith<CancellationException> { send("MSG") }
48+
expect(5)
49+
}
50+
expect(2)
51+
yield() // to produce
52+
val b = a.broadcast()
53+
b.cancel()
54+
expect(4)
55+
yield() // to abort produce
56+
assertTrue(a.isClosedForReceive) // the source channel was consumed
57+
finish(6)
58+
}
59+
60+
@Test
61+
fun testChannelBroadcastLazyClose() = runTest {
62+
expect(1)
63+
val a = produce {
64+
expect(3)
65+
send("MSG")
66+
expect(5)
67+
}
68+
expect(2)
69+
yield() // to produce
70+
val b = a.broadcast()
71+
b.close()
72+
expect(4)
73+
yield() // to abort produce
74+
assertTrue(a.isClosedForReceive) // the source channel was consumed
75+
finish(6)
76+
}
77+
78+
@Test
79+
fun testChannelBroadcastEagerCancel() = runTest {
80+
expect(1)
81+
val a = produce<Unit> {
82+
expect(3)
83+
yield() // back to main
84+
expectUnreached() // will be cancelled
85+
}
86+
expect(2)
87+
val b = a.broadcast(start = CoroutineStart.DEFAULT)
88+
yield() // to produce
89+
expect(4)
90+
b.cancel()
91+
yield() // to produce (cancelled)
92+
assertTrue(a.isClosedForReceive) // the source channel was consumed
93+
finish(5)
94+
}
95+
96+
@Test
97+
fun testChannelBroadcastEagerClose() = runTest {
98+
expect(1)
99+
val a = produce<Unit> {
100+
expect(3)
101+
yield() // back to main
102+
// shall eventually get cancelled
103+
assertFailsWith<CancellationException> {
104+
while (true) { send(Unit) }
105+
}
106+
}
107+
expect(2)
108+
val b = a.broadcast(start = CoroutineStart.DEFAULT)
109+
yield() // to produce
110+
expect(4)
111+
b.close()
112+
yield() // to produce (closed)
113+
assertTrue(a.isClosedForReceive) // the source channel was consumed
114+
finish(5)
115+
}
116+
117+
@Test
118+
fun testBroadcastCloseWithException() = runTest {
119+
expect(1)
120+
val b = broadcast(NonCancellable, capacity = 1) {
121+
expect(2)
122+
send(1)
123+
expect(3)
124+
send(2) // suspends
125+
expect(5)
126+
// additional attempts to send fail
127+
assertFailsWith<TestException> { send(3) }
128+
}
129+
val sub = b.openSubscription()
130+
yield() // into broadcast
131+
expect(4)
132+
b.close(TestException()) // close broadcast channel with exception
133+
assertTrue(b.isClosedForSend) // sub was also closed
134+
assertEquals(1, sub.receive()) // 1st element received
135+
assertEquals(2, sub.receive()) // 2nd element received
136+
assertFailsWith<TestException> { sub.receive() } // then closed with exception
137+
yield() // to cancel broadcast
138+
finish(6)
139+
}
37140
}

0 commit comments

Comments
 (0)