Skip to content

Commit f41bd3e

Browse files
committed
ReceiveChannel.broadcast shall properly consume source channel when closed
Fixes #1713
1 parent cc3d8c4 commit f41bd3e

File tree

2 files changed

+59
-4
lines changed

2 files changed

+59
-4
lines changed

kotlinx-coroutines-core/common/src/channels/Broadcast.kt

+15-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
@@ -13,6 +13,7 @@ import kotlin.coroutines.intrinsics.*
1313

1414
/**
1515
* Broadcasts all elements of the channel.
16+
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
1617
*
1718
* The kind of the resulting channel depends on the specified [capacity] parameter:
1819
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
@@ -28,7 +29,12 @@ fun <E> ReceiveChannel<E>.broadcast(
2829
): BroadcastChannel<E> =
2930
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
3031
for (e in this@broadcast) {
31-
send(e)
32+
try {
33+
send(e)
34+
} catch (e: ClosedSendChannelException) {
35+
// the resulting BroadcastChannel was closed -> just break the sending loop
36+
break
37+
}
3238
}
3339
}
3440

@@ -119,6 +125,13 @@ private open class BroadcastCoroutine<E>(
119125
val processed = _channel.close(cause)
120126
if (!processed && !handled) handleCoroutineException(context, cause)
121127
}
128+
129+
// The BroadcastChannel could be also closed
130+
override fun close(cause: Throwable?): Boolean {
131+
val result = _channel.close(cause)
132+
cancelCoroutine(cause)
133+
return result
134+
}
122135
}
123136

124137
private class LazyBroadcastCoroutine<E>(

kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt

+44-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.channels
66

77
import kotlinx.coroutines.*
8-
import kotlin.coroutines.*
98
import kotlin.test.*
109

1110
class BroadcastTest : TestBase() {
@@ -34,4 +33,47 @@ class BroadcastTest : TestBase() {
3433
yield() // to broadcast
3534
finish(11)
3635
}
36+
37+
/**
38+
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
39+
*/
40+
@Test
41+
fun testChannelBroadcastLazyClose() = runTest {
42+
expect(1)
43+
val a = produce {
44+
expect(3)
45+
try {
46+
send("MSG")
47+
} finally {
48+
expect(5)
49+
}
50+
expectUnreached()
51+
}
52+
expect(2)
53+
yield() // to produce
54+
val b = a.broadcast()
55+
b.close()
56+
expect(4)
57+
yield() // to abort produce
58+
assertTrue(a.isClosedForReceive) // the source channel was consumed
59+
finish(6)
60+
}
61+
62+
@Test
63+
fun testChannelBroadcastEagerClose() = runTest {
64+
expect(1)
65+
val a = produce<Unit> {
66+
expect(3)
67+
yield() // back to main
68+
expectUnreached() // will be cancelled
69+
}
70+
expect(2)
71+
val b = a.broadcast(start = CoroutineStart.DEFAULT)
72+
yield() // to produce
73+
expect(4)
74+
b.close()
75+
yield() // to produce (cancelled)
76+
assertTrue(a.isClosedForReceive) // the source channel was consumed
77+
finish(5)
78+
}
3779
}

0 commit comments

Comments
 (0)