1
1
/*
2
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3
3
*/
4
4
5
5
package kotlinx.coroutines.channels
@@ -10,28 +10,51 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
10
10
import kotlinx.coroutines.intrinsics.*
11
11
import kotlin.coroutines.*
12
12
import kotlin.coroutines.intrinsics.*
13
+ import kotlin.native.concurrent.*
13
14
14
15
/* *
15
16
* Broadcasts all elements of the channel.
17
+ * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
16
18
*
17
19
* The kind of the resulting channel depends on the specified [capacity] parameter:
18
20
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
19
21
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
20
22
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
21
23
* otherwise -- throws [IllegalArgumentException].
22
24
*
25
+ * ### Cancelling broadcast
26
+ *
27
+ * **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
28
+ *
29
+ * Do not use [close][BroadcastChannel.close] on the resulting channel.
30
+ * It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
31
+ * but it is not as prompt.
32
+ *
33
+ * ### Obsolete
34
+ *
35
+ * This function has an inappropriate result type of [BroadcastChannel] which provides
36
+ * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
37
+ * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
38
+ * sharing operators on [Flow][kotlinx.coroutines.flow.Flow].
39
+ *
23
40
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
24
41
*/
42
+ @ObsoleteCoroutinesApi // since version 1.4.0
25
43
fun <E > ReceiveChannel<E>.broadcast (
26
44
capacity : Int = 1,
27
45
start : CoroutineStart = CoroutineStart .LAZY
28
46
): BroadcastChannel <E > =
29
- GlobalScope .broadcast(Dispatchers .Unconfined , capacity = capacity, start = start, onCompletion = consumes()) {
47
+ OperatorScope .broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
48
+ // We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
49
+ // which passes all exceptions upstream to the source ReceiveChannel
30
50
for (e in this @broadcast) {
31
51
send(e)
32
52
}
33
53
}
34
54
55
+ @SharedImmutable
56
+ private val OperatorScope = GlobalScope + Dispatchers .Unconfined + CoroutineExceptionHandler { _, _ -> }
57
+
35
58
/* *
36
59
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
37
60
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
@@ -63,12 +86,28 @@ fun <E> ReceiveChannel<E>.broadcast(
63
86
*
64
87
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
65
88
*
89
+ * ### Cancelling broadcast
90
+ *
91
+ * **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
92
+ *
93
+ * Do not use [close][BroadcastChannel.close] on the resulting channel.
94
+ * It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
95
+ * coroutine is doing something else.
96
+ *
97
+ * ### Obsolete
98
+ *
99
+ * This function has an inappropriate result type of [BroadcastChannel] which provides
100
+ * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
101
+ * the broadcasting coroutine in hard-to-specify ways. It will be replaced with
102
+ * sharing operators on [Flow][kotlinx.coroutines.flow.Flow].
103
+ *
66
104
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
67
105
* @param capacity capacity of the channel's buffer (1 by default).
68
106
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
69
107
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
70
108
* @param block the coroutine code.
71
109
*/
110
+ @ObsoleteCoroutinesApi // since version 1.4.0
72
111
public fun <E > CoroutineScope.broadcast (
73
112
context : CoroutineContext = EmptyCoroutineContext ,
74
113
capacity : Int = 1,
@@ -107,8 +146,9 @@ private open class BroadcastCoroutine<E>(
107
146
}
108
147
109
148
override fun cancelInternal (cause : Throwable ) {
110
- _channel .cancel(cause.toCancellationException()) // cancel the channel
111
- cancelCoroutine(cause) // cancel the job
149
+ val exception = cause.toCancellationException()
150
+ _channel .cancel(exception) // cancel the channel
151
+ cancelCoroutine(exception) // cancel the job
112
152
}
113
153
114
154
override fun onCompleted (value : Unit ) {
@@ -119,6 +159,13 @@ private open class BroadcastCoroutine<E>(
119
159
val processed = _channel .close(cause)
120
160
if (! processed && ! handled) handleCoroutineException(context, cause)
121
161
}
162
+
163
+ // The BroadcastChannel could be also closed
164
+ override fun close (cause : Throwable ? ): Boolean {
165
+ val result = _channel .close(cause)
166
+ start() // start coroutine if it was not started yet
167
+ return result
168
+ }
122
169
}
123
170
124
171
private class LazyBroadcastCoroutine <E >(
0 commit comments