Skip to content

Commit b39c50d

Browse files
committed
Introduce broadcast coroutine builder;
support `BroadcastChannel.cancel` method to drop the buffer; Introduce ReceiveChannel.broadcast() extension. Fixes #280
1 parent a576a3f commit b39c50d

File tree

10 files changed

+276
-20
lines changed

10 files changed

+276
-20
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+11
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/ActorSc
540540

541541
public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/BroadcastChannel {
542542
public fun <init> (I)V
543+
public fun cancel (Ljava/lang/Throwable;)Z
543544
public fun close (Ljava/lang/Throwable;)Z
544545
public final fun getCapacity ()I
545546
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
@@ -564,12 +565,14 @@ public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/cor
564565

565566
public abstract interface class kotlinx/coroutines/experimental/channels/BroadcastChannel : kotlinx/coroutines/experimental/channels/SendChannel {
566567
public static final field Factory Lkotlinx/coroutines/experimental/channels/BroadcastChannel$Factory;
568+
public abstract fun cancel (Ljava/lang/Throwable;)Z
567569
public abstract fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
568570
public abstract fun openSubscription ()Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
569571
public abstract synthetic fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
570572
}
571573

572574
public final class kotlinx/coroutines/experimental/channels/BroadcastChannel$DefaultImpls {
575+
public static synthetic fun cancel$default (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
573576
public static fun open (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
574577
public static synthetic fun openSubscription (Lkotlinx/coroutines/experimental/channels/BroadcastChannel;)Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
575578
}
@@ -582,6 +585,13 @@ public final class kotlinx/coroutines/experimental/channels/BroadcastChannelKt {
582585
public static final fun BroadcastChannel (I)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
583586
}
584587

588+
public final class kotlinx/coroutines/experimental/channels/BroadcastKt {
589+
public static final fun broadcast (Lkotlin/coroutines/experimental/CoroutineContext;ILkotlinx/coroutines/experimental/CoroutineStart;Lkotlinx/coroutines/experimental/Job;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
590+
public static final fun broadcast (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;ILkotlinx/coroutines/experimental/CoroutineStart;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
591+
public static synthetic fun broadcast$default (Lkotlin/coroutines/experimental/CoroutineContext;ILkotlinx/coroutines/experimental/CoroutineStart;Lkotlinx/coroutines/experimental/Job;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
592+
public static synthetic fun broadcast$default (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;ILkotlinx/coroutines/experimental/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/experimental/channels/BroadcastChannel;
593+
}
594+
585595
public abstract interface class kotlinx/coroutines/experimental/channels/Channel : kotlinx/coroutines/experimental/channels/ReceiveChannel, kotlinx/coroutines/experimental/channels/SendChannel {
586596
public static final field CONFLATED I
587597
public static final field Factory Lkotlinx/coroutines/experimental/channels/Channel$Factory;
@@ -715,6 +725,7 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
715725
public static final field UNDEFINED Lkotlinx/coroutines/experimental/internal/Symbol;
716726
public fun <init> ()V
717727
public fun <init> (Ljava/lang/Object;)V
728+
public fun cancel (Ljava/lang/Throwable;)Z
718729
public fun close (Ljava/lang/Throwable;)Z
719730
public fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
720731
public final fun getValue ()Ljava/lang/Object;

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+16-4
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import kotlinx.coroutines.experimental.selects.*
2525
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
2626
* receiver suspends only when buffer is empty.
2727
*
28-
* Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
29-
* lost.
28+
* **Note**, that elements that are sent to this channel while there are no
29+
* [openSubscription] subscribers are immediately lost.
3030
*
3131
* This channel is created by `BroadcastChannel(capacity)` factory function invocation.
3232
*
@@ -68,17 +68,22 @@ class ArrayBroadcastChannel<E>(
6868
override val isBufferAlwaysFull: Boolean get() = false
6969
override val isBufferFull: Boolean get() = size >= capacity
7070

71-
override fun openSubscription(): ReceiveChannel<E> =
71+
public override fun openSubscription(): ReceiveChannel<E> =
7272
Subscriber(this).also {
7373
updateHead(addSub = it)
7474
}
7575

76-
override fun close(cause: Throwable?): Boolean {
76+
public override fun close(cause: Throwable?): Boolean {
7777
if (!super.close(cause)) return false
7878
checkSubOffers()
7979
return true
8080
}
8181

82+
public override fun cancel(cause: Throwable?): Boolean =
83+
close(cause).also {
84+
for (sub in subs) sub.cancel(cause)
85+
}
86+
8287
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
8388
override fun offerInternal(element: E): Any {
8489
bufferLock.withLock {
@@ -210,8 +215,15 @@ class ArrayBroadcastChannel<E>(
210215
override fun cancel(cause: Throwable?): Boolean =
211216
close(cause).also { closed ->
212217
if (closed) broadcastChannel.updateHead(removeSub = this)
218+
clearBuffer()
213219
}
214220

221+
private fun clearBuffer() {
222+
subLock.withLock {
223+
subHead = broadcastChannel.tail
224+
}
225+
}
226+
215227
// returns true if subHead was updated and broadcast channel's head must be checked
216228
// this method is lock-free (it never waits on lock)
217229
@Suppress("UNCHECKED_CAST")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
21+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
22+
import kotlinx.coroutines.experimental.intrinsics.*
23+
import kotlin.coroutines.experimental.*
24+
25+
/**
26+
* Broadcasts all elements of the channel.
27+
*
28+
* @param capacity capacity of the channel's buffer (1 by default).
29+
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
30+
*/
31+
fun <E> ReceiveChannel<E>.broadcast(
32+
capacity: Int = 1,
33+
start: CoroutineStart = CoroutineStart.LAZY
34+
) : BroadcastChannel<E> =
35+
broadcast(Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
36+
for (e in this@broadcast) {
37+
send(e)
38+
}
39+
}
40+
41+
/**
42+
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
43+
* and returns a reference to the coroutine as a [BroadcastChannel]. This resulting
44+
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
45+
*
46+
* The scope of the coroutine contains [ProducerScope] interface, which implements
47+
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
48+
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
49+
* when the coroutine completes.
50+
*
51+
* The [context] for the new coroutine can be explicitly specified.
52+
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
53+
* The [coroutineContext] of the parent coroutine may be used,
54+
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
55+
* The parent job may be also explicitly specified using [parent] parameter.
56+
*
57+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
58+
*
59+
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
60+
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
61+
*
62+
* The kind of the resulting channel depends on the specified [capacity] parameter:
63+
* * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
64+
* * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
65+
* * otherwise -- throws [IllegalArgumentException].
66+
*
67+
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
68+
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
69+
* This ensures that the first subscriber does not miss any sent elements.
70+
* However, later subscribers may miss elements.
71+
*
72+
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
73+
*
74+
* @param context context of the coroutine. The default value is [DefaultDispatcher].
75+
* @param capacity capacity of the channel's buffer (1 by default).
76+
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
77+
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
78+
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
79+
* @param block the coroutine code.
80+
*/
81+
public fun <E> broadcast(
82+
context: CoroutineContext = DefaultDispatcher,
83+
capacity: Int = 1,
84+
start: CoroutineStart = CoroutineStart.LAZY,
85+
parent: Job? = null,
86+
onCompletion: CompletionHandler? = null,
87+
block: suspend ProducerScope<E>.() -> Unit
88+
): BroadcastChannel<E> {
89+
val channel = BroadcastChannel<E>(capacity)
90+
val newContext = newCoroutineContext(context, parent)
91+
val coroutine = if (start.isLazy)
92+
LazyBroadcastCoroutine(newContext, channel, block) else
93+
BroadcastCoroutine(newContext, channel, active = true)
94+
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
95+
coroutine.start(start, coroutine, block)
96+
return coroutine
97+
}
98+
99+
private open class BroadcastCoroutine<E>(
100+
parentContext: CoroutineContext,
101+
protected val _channel: BroadcastChannel<E>,
102+
active: Boolean
103+
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
104+
override val channel: SendChannel<E>
105+
get() = this
106+
107+
@Suppress("RETURN_TYPE_MISMATCH_ON_OVERRIDE")
108+
public override fun openSubscription(): ReceiveChannel<E> = _channel.openSubscription()
109+
110+
public override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
111+
112+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
113+
val cause = exceptionally?.cause
114+
val processed = when (exceptionally) {
115+
is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
116+
else -> _channel.close(cause) // producer coroutine has completed -- close channel
117+
}
118+
if (!processed && cause != null)
119+
handleCoroutineException(context, cause)
120+
}
121+
122+
123+
// Workaround for KT-23094
124+
override suspend fun send(element: E) = _channel.send(element)
125+
}
126+
127+
private class LazyBroadcastCoroutine<E>(
128+
parentContext: CoroutineContext,
129+
channel: BroadcastChannel<E>,
130+
private val block: suspend ProducerScope<E>.() -> Unit
131+
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
132+
override fun openSubscription(): ReceiveChannel<E> {
133+
// open subscription _first_
134+
val subscription = _channel.openSubscription()
135+
// then start coroutine
136+
start()
137+
return subscription
138+
}
139+
140+
override fun onStart() {
141+
block.startCoroutineCancellable(this, this)
142+
}
143+
}
144+

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+13-1
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,25 @@ public interface BroadcastChannel<E> : SendChannel<E> {
6565
@Deprecated(message = "Renamed to `openSubscription`",
6666
replaceWith = ReplaceWith("openSubscription()"))
6767
public fun open(): SubscriptionReceiveChannel<E> = openSubscription() as SubscriptionReceiveChannel<E>
68+
69+
/**
70+
* Cancels reception of remaining elements from this channel. This function closes the channel with
71+
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
72+
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
73+
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
74+
*
75+
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
76+
* receive on a failed channel throw the specified [cause] exception.
77+
*/
78+
public fun cancel(cause: Throwable? = null): Boolean
6879
}
6980

7081
/**
7182
* Creates a broadcast channel with the specified buffer capacity.
7283
*
7384
* The resulting channel type depends on the specified [capacity] parameter:
74-
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
85+
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel]
86+
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
7587
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
7688
* * otherwise -- throws [IllegalArgumentException].
7789
*/

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

+2-7
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.CancellationException
20-
import kotlinx.coroutines.experimental.CoroutineScope
21-
import kotlinx.coroutines.experimental.Job
19+
import kotlinx.coroutines.experimental.*
2220
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2321
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
24-
import kotlinx.coroutines.experimental.selects.SelectClause1
25-
import kotlinx.coroutines.experimental.selects.SelectClause2
26-
import kotlinx.coroutines.experimental.selects.select
27-
import kotlinx.coroutines.experimental.yield
22+
import kotlinx.coroutines.experimental.selects.*
2823

2924
/**
3025
* Sender's interface to [Channel].

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+13-8
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
110110
}
111111
}
112112

113-
override val isClosedForSend: Boolean get() = _state.value is Closed
114-
override val isFull: Boolean get() = false
113+
public override val isClosedForSend: Boolean get() = _state.value is Closed
114+
public override val isFull: Boolean get() = false
115115

116116
@Suppress("UNCHECKED_CAST")
117-
override fun openSubscription(): ReceiveChannel<E> {
117+
public override fun openSubscription(): ReceiveChannel<E> {
118118
val subscriber = Subscriber<E>(this)
119119
_state.loop { state ->
120120
when (state) {
@@ -150,7 +150,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
150150
}
151151

152152
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
153-
if (list == null) return Array<Subscriber<E>>(1) { subscriber }
153+
if (list == null) return Array(1) { subscriber }
154154
return list + subscriber
155155
}
156156

@@ -167,7 +167,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
167167
}
168168

169169
@Suppress("UNCHECKED_CAST")
170-
override fun close(cause: Throwable?): Boolean {
170+
public override fun close(cause: Throwable?): Boolean {
171171
_state.loop { state ->
172172
when (state) {
173173
is Closed -> return false
@@ -183,12 +183,17 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
183183
}
184184
}
185185

186+
/**
187+
* Closes this broadcast channel. Same as [close].
188+
*/
189+
public override fun cancel(cause: Throwable?): Boolean = close(cause)
190+
186191
/**
187192
* Sends the value to all subscribed receives and stores this value as the most recent state for
188193
* future subscribers. This implementation never suspends.
189194
* It throws exception if the channel [isClosedForSend] (see [close] for details).
190195
*/
191-
suspend override fun send(element: E) {
196+
public override suspend fun send(element: E) {
192197
offerInternal(element)?.let { throw it.sendException }
193198
}
194199

@@ -197,7 +202,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
197202
* future subscribers. This implementation always returns `true`.
198203
* It throws exception if the channel [isClosedForSend] (see [close] for details).
199204
*/
200-
override fun offer(element: E): Boolean {
205+
public override fun offer(element: E): Boolean {
201206
offerInternal(element)?.let { throw it.sendException }
202207
return true
203208
}
@@ -229,7 +234,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
229234
}
230235
}
231236

232-
override val onSend: SelectClause2<E, SendChannel<E>>
237+
public override val onSend: SelectClause2<E, SendChannel<E>>
233238
get() = object : SelectClause2<E, SendChannel<E>> {
234239
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
235240
registerSelectSend(select, param, block)

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
2021
import kotlin.coroutines.experimental.*
2122

2223
/**
@@ -65,6 +66,13 @@ interface ProducerJob<out E> : ReceiveChannel<E>, Job {
6566
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
6667
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
6768
*
69+
* The kind of the resulting channel depends on the specified [capacity] parameter:
70+
* * when `capacity` is 0 (default) -- uses [RendezvousChannel] without a buffer;
71+
* * when `capacity` is [Channel.UNLIMITED] -- uses [LinkedListChannel] with buffer of unlimited size;
72+
* * when `capacity` is [Channel.CONFLATED] -- uses [ConflatedChannel] that conflates back-to-back sends;
73+
* * when `capacity` is positive, but less than [UNLIMITED] -- uses [ArrayChannel] with a buffer of the specified `capacity`;
74+
* * otherwise -- throws [IllegalArgumentException].
75+
*
6876
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
6977
*
7078
* @param context context of the coroutine. The default value is [DefaultDispatcher].

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt

+12
Original file line numberDiff line numberDiff line change
@@ -184,5 +184,17 @@ class ArrayBroadcastChannelTest : TestBase() {
184184
subscription.receiveOrNull()
185185
}
186186

187+
@Test
188+
fun testReceiveNoneAfterCancel() = runTest {
189+
val channel = BroadcastChannel<Int>(10)
190+
val sub = channel.openSubscription()
191+
// generate into buffer & cancel
192+
for (x in 1..5) channel.send(x)
193+
channel.cancel()
194+
assertTrue(channel.isClosedForSend)
195+
assertTrue(sub.isClosedForReceive)
196+
check(sub.receiveOrNull() == null)
197+
}
198+
187199
private class TestException : Exception()
188200
}

0 commit comments

Comments
 (0)