Skip to content

Commit 4a1835e

Browse files
committed
ReceiveChannel.broadcast shall start lazy coroutine when closed
Documentation on broadcast operators is added that explain the the resulting BroadcastChannel shall be cancelled if it is not needed anymore. Fixes #1713
1 parent cc3d8c4 commit 4a1835e

File tree

2 files changed

+140
-6
lines changed

2 files changed

+140
-6
lines changed

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+35-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -10,28 +10,43 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
1010
import kotlinx.coroutines.intrinsics.*
1111
import kotlin.coroutines.*
1212
import kotlin.coroutines.intrinsics.*
13+
import kotlin.native.concurrent.*
1314

1415
/**
1516
* Broadcasts all elements of the channel.
17+
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1618
*
1719
* The kind of the resulting channel depends on the specified [capacity] parameter:
1820
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
1921
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
2022
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
2123
* otherwise -- throws [IllegalArgumentException].
2224
*
25+
* ### Cancelling broadcast
26+
*
27+
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
28+
*
29+
* Do not use [close][BroadcastChannel.close] on the resulting channel.
30+
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
31+
* but it is not as prompt.
32+
*
2333
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
2434
*/
2535
fun <E> ReceiveChannel<E>.broadcast(
2636
capacity: Int = 1,
2737
start: CoroutineStart = CoroutineStart.LAZY
2838
): BroadcastChannel<E> =
29-
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
39+
OperatorScope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
40+
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
41+
// which passes all exceptions upstream to the source ReceiveChannel
3042
for (e in this@broadcast) {
3143
send(e)
3244
}
3345
}
3446

47+
@SharedImmutable
48+
private val OperatorScope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
49+
3550
/**
3651
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
3752
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
@@ -63,6 +78,14 @@ fun <E> ReceiveChannel<E>.broadcast(
6378
*
6479
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
6580
*
81+
* ### Cancelling broadcast
82+
*
83+
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
84+
*
85+
* Do not use [close][BroadcastChannel.close] on the resulting channel.
86+
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
87+
* coroutine is doing something else.
88+
*
6689
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
6790
* @param capacity capacity of the channel's buffer (1 by default).
6891
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
@@ -107,8 +130,9 @@ private open class BroadcastCoroutine<E>(
107130
}
108131

109132
override fun cancelInternal(cause: Throwable) {
110-
_channel.cancel(cause.toCancellationException()) // cancel the channel
111-
cancelCoroutine(cause) // cancel the job
133+
val exception = cause.toCancellationException()
134+
_channel.cancel(exception) // cancel the channel
135+
cancelCoroutine(exception) // cancel the job
112136
}
113137

114138
override fun onCompleted(value: Unit) {
@@ -119,6 +143,13 @@ private open class BroadcastCoroutine<E>(
119143
val processed = _channel.close(cause)
120144
if (!processed && !handled) handleCoroutineException(context, cause)
121145
}
146+
147+
// The BroadcastChannel could be also closed
148+
override fun close(cause: Throwable?): Boolean {
149+
val result = _channel.close(cause)
150+
start() // start coroutine if it was not started yet
151+
return result
152+
}
122153
}
123154

124155
private class LazyBroadcastCoroutine<E>(

kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt

+105-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

5+
@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED")
6+
57
package kotlinx.coroutines.channels
68

79
import kotlinx.coroutines.*
8-
import kotlin.coroutines.*
910
import kotlin.test.*
1011

1112
class BroadcastTest : TestBase() {
@@ -34,4 +35,106 @@ class BroadcastTest : TestBase() {
3435
yield() // to broadcast
3536
finish(11)
3637
}
38+
39+
/**
40+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
41+
*/
42+
@Test
43+
fun testChannelBroadcastLazyCancel() = runTest {
44+
expect(1)
45+
val a = produce {
46+
expect(3)
47+
assertFailsWith<CancellationException> { send("MSG") }
48+
expect(5)
49+
}
50+
expect(2)
51+
yield() // to produce
52+
val b = a.broadcast()
53+
b.cancel()
54+
expect(4)
55+
yield() // to abort produce
56+
assertTrue(a.isClosedForReceive) // the source channel was consumed
57+
finish(6)
58+
}
59+
60+
@Test
61+
fun testChannelBroadcastLazyClose() = runTest {
62+
expect(1)
63+
val a = produce {
64+
expect(3)
65+
send("MSG")
66+
expect(5)
67+
}
68+
expect(2)
69+
yield() // to produce
70+
val b = a.broadcast()
71+
b.close()
72+
expect(4)
73+
yield() // to abort produce
74+
assertTrue(a.isClosedForReceive) // the source channel was consumed
75+
finish(6)
76+
}
77+
78+
@Test
79+
fun testChannelBroadcastEagerCancel() = runTest {
80+
expect(1)
81+
val a = produce<Unit> {
82+
expect(3)
83+
yield() // back to main
84+
expectUnreached() // will be cancelled
85+
}
86+
expect(2)
87+
val b = a.broadcast(start = CoroutineStart.DEFAULT)
88+
yield() // to produce
89+
expect(4)
90+
b.cancel()
91+
yield() // to produce (cancelled)
92+
assertTrue(a.isClosedForReceive) // the source channel was consumed
93+
finish(5)
94+
}
95+
96+
@Test
97+
fun testChannelBroadcastEagerClose() = runTest {
98+
expect(1)
99+
val a = produce<Unit> {
100+
expect(3)
101+
yield() // back to main
102+
// shall eventually get cancelled
103+
assertFailsWith<CancellationException> {
104+
while (true) { send(Unit) }
105+
}
106+
}
107+
expect(2)
108+
val b = a.broadcast(start = CoroutineStart.DEFAULT)
109+
yield() // to produce
110+
expect(4)
111+
b.close()
112+
yield() // to produce (closed)
113+
assertTrue(a.isClosedForReceive) // the source channel was consumed
114+
finish(5)
115+
}
116+
117+
@Test
118+
fun testBroadcastCloseWithException() = runTest {
119+
expect(1)
120+
val b = broadcast(NonCancellable, capacity = 1) {
121+
expect(2)
122+
send(1)
123+
expect(3)
124+
send(2) // suspends
125+
expect(5)
126+
// additional attempts to send fail
127+
assertFailsWith<TestException> { send(3) }
128+
}
129+
val sub = b.openSubscription()
130+
yield() // into broadcast
131+
expect(4)
132+
b.close(TestException()) // close broadcast channel with exception
133+
assertTrue(b.isClosedForSend) // sub was also closed
134+
assertEquals(1, sub.receive()) // 1st element received
135+
assertEquals(2, sub.receive()) // 2nd element received
136+
assertFailsWith<TestException> { sub.receive() } // then closed with exception
137+
yield() // to cancel broadcast
138+
finish(6)
139+
}
37140
}

0 commit comments

Comments
 (0)