Skip to content

Commit 173c39c

Browse files
committed
WIP: Introduce broadcast coroutine builder;
support `BroadcastChannel.cancel` method to drop the buffer. Fixes #280
1 parent 45bcb0b commit 173c39c

File tree

8 files changed

+229
-28
lines changed

8 files changed

+229
-28
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
2021
import kotlinx.coroutines.experimental.intrinsics.*
2122
import kotlinx.coroutines.experimental.selects.*
2223
import kotlin.coroutines.experimental.*
@@ -71,6 +72,13 @@ interface ActorJob<in E> : SendChannel<E> {
7172
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
7273
* the resulting channel becomes _failed_, so that any attempt to send to such a channel throws exception.
7374
*
75+
* The kind of the resulting channel depends on the specified [capacity] parameter:
76+
* * when `capacity` is 0 (default) -- uses [RendezvousChannel] without a buffer;
77+
* * when `capacity` is [Channel.UNLIMITED] -- uses [LinkedListChannel] with buffer of unlimited size;
78+
* * when `capacity` is [Channel.CONFLATED] -- uses [ConflatedChannel] that conflates back-to-back sends;
79+
* * when `capacity` is positive, but less than [UNLIMITED] -- uses [ArrayChannel] with a buffer of the specified `capacity`;
80+
* * otherwise -- throws [IllegalArgumentException].
81+
*
7482
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
7583
*
7684
* ### Using actors

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+16-4
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import kotlin.concurrent.*
2626
* Sender suspends only when buffer is full due to one of the receives being slow to consume and
2727
* receiver suspends only when buffer is empty.
2828
*
29-
* Note, that elements that are sent to the broadcast channel while there are no [openSubscription] subscribers are immediately
30-
* lost.
29+
* **Note**, that elements that are sent to this channel while there are no
30+
* [openSubscription] subscribers are immediately lost.
3131
*
3232
* This channel is created by `BroadcastChannel(capacity)` factory function invocation.
3333
*
@@ -69,17 +69,22 @@ class ArrayBroadcastChannel<E>(
6969
override val isBufferAlwaysFull: Boolean get() = false
7070
override val isBufferFull: Boolean get() = size >= capacity
7171

72-
override fun openSubscription(): SubscriptionReceiveChannel<E> =
72+
public override fun openSubscription(): SubscriptionReceiveChannel<E> =
7373
Subscriber(this).also {
7474
updateHead(addSub = it)
7575
}
7676

77-
override fun close(cause: Throwable?): Boolean {
77+
public override fun close(cause: Throwable?): Boolean {
7878
if (!super.close(cause)) return false
7979
checkSubOffers()
8080
return true
8181
}
8282

83+
public override fun cancel(cause: Throwable?): Boolean =
84+
close(cause).also {
85+
for (sub in subs) sub.cancel(cause)
86+
}
87+
8388
// result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
8489
override fun offerInternal(element: E): Any {
8590
bufferLock.withLock {
@@ -211,8 +216,15 @@ class ArrayBroadcastChannel<E>(
211216
override fun cancel(cause: Throwable?): Boolean =
212217
close(cause).also { closed ->
213218
if (closed) broadcastChannel.updateHead(removeSub = this)
219+
clearBuffer()
214220
}
215221

222+
private fun clearBuffer() {
223+
subLock.withLock {
224+
subHead = broadcastChannel.tail
225+
}
226+
}
227+
216228
// returns true if subHead was updated and broadcast channel's head must be checked
217229
// this method is lock-free (it never waits on lock)
218230
@Suppress("UNCHECKED_CAST")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/*
18+
* Copyright 2016-2017 JetBrains s.r.o.
19+
*
20+
* Licensed under the Apache License, Version 2.0 (the "License");
21+
* you may not use this file except in compliance with the License.
22+
* You may obtain a copy of the License at
23+
*
24+
* http://www.apache.org/licenses/LICENSE-2.0
25+
*
26+
* Unless required by applicable law or agreed to in writing, software
27+
* distributed under the License is distributed on an "AS IS" BASIS,
28+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29+
* See the License for the specific language governing permissions and
30+
* limitations under the License.
31+
*/
32+
33+
/*
34+
* Copyright 2016-2017 JetBrains s.r.o.
35+
*
36+
* Licensed under the Apache License, Version 2.0 (the "License");
37+
* you may not use this file except in compliance with the License.
38+
* You may obtain a copy of the License at
39+
*
40+
* http://www.apache.org/licenses/LICENSE-2.0
41+
*
42+
* Unless required by applicable law or agreed to in writing, software
43+
* distributed under the License is distributed on an "AS IS" BASIS,
44+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
45+
* See the License for the specific language governing permissions and
46+
* limitations under the License.
47+
*/
48+
49+
package kotlinx.coroutines.experimental.channels
50+
51+
import kotlinx.coroutines.experimental.*
52+
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
53+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
54+
import kotlinx.coroutines.experimental.intrinsics.*
55+
import kotlin.coroutines.experimental.*
56+
57+
/**
58+
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
59+
* and returns a reference to the coroutine as a [BroadcastChannel]. This resulting
60+
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
61+
*
62+
* The scope of the coroutine contains [ProducerScope] interface, which implements
63+
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
64+
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
65+
* when the coroutine completes.
66+
*
67+
* The [context] for the new coroutine can be explicitly specified.
68+
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
69+
* The [coroutineContext] of the parent coroutine may be used,
70+
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
71+
* The parent job may be also explicitly specified using [parent] parameter.
72+
*
73+
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
74+
*
75+
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
76+
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
77+
*
78+
* The kind of the resulting channel depends on the specified [capacity] parameter:
79+
* * when `capacity` positive (1 by default), but less than [UNLIMITED] -- uses [ArrayBroadcastChannel]
80+
* * when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
81+
* * otherwise -- throws [IllegalArgumentException].
82+
*
83+
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
84+
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
85+
* This ensures that the first subscriber does not miss any sent elements.
86+
* However, later subscribers may miss elements.
87+
*
88+
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
89+
*
90+
* @param context context of the coroutine. The default value is [DefaultDispatcher].
91+
* @param capacity capacity of the channel's buffer (1 by default).
92+
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
93+
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
94+
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
95+
* @param block the coroutine code.
96+
*/
97+
public fun <E> broadcast(
98+
context: CoroutineContext = DefaultDispatcher,
99+
capacity: Int = 1,
100+
start: CoroutineStart = CoroutineStart.LAZY,
101+
parent: Job? = null,
102+
onCompletion: CompletionHandler? = null,
103+
block: suspend ProducerScope<E>.() -> Unit
104+
): BroadcastChannel<E> {
105+
val channel = BroadcastChannel<E>(capacity)
106+
val newContext = newCoroutineContext(context, parent)
107+
val coroutine = if (start.isLazy)
108+
LazyBroadcastCoroutine(newContext, channel, block) else
109+
BroadcastCoroutine(newContext, channel, active = true)
110+
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
111+
coroutine.start(start, coroutine, block)
112+
return coroutine
113+
}
114+
115+
private open class BroadcastCoroutine<E>(
116+
parentContext: CoroutineContext,
117+
protected val _channel: BroadcastChannel<E>,
118+
active: Boolean
119+
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
120+
override val channel: SendChannel<E>
121+
get() = this
122+
123+
override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
124+
val cause = exceptionally?.cause
125+
val processed = when (exceptionally) {
126+
is Cancelled -> _channel.cancel(cause) // producer coroutine was cancelled -- cancel channel
127+
else -> _channel.close(cause) // producer coroutine has completed -- close channel
128+
}
129+
if (!processed && cause != null)
130+
handleCoroutineException(context, cause)
131+
}
132+
}
133+
134+
private class LazyBroadcastCoroutine<E>(
135+
parentContext: CoroutineContext,
136+
channel: BroadcastChannel<E>,
137+
private val block: suspend ProducerScope<E>.() -> Unit
138+
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
139+
override fun openSubscription(): SubscriptionReceiveChannel<E> {
140+
// open subscription _first_
141+
val subscription = _channel.openSubscription()
142+
// then start coroutine
143+
start()
144+
return subscription
145+
}
146+
147+
override fun onStart() {
148+
block.startCoroutineCancellable(this, this)
149+
}
150+
}
151+

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+14-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2020
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
21-
import java.io.Closeable
21+
import java.io.*
2222

2323
/**
2424
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
@@ -55,13 +55,25 @@ public interface BroadcastChannel<E> : SendChannel<E> {
5555
@Deprecated(message = "Renamed to `openSubscription`",
5656
replaceWith = ReplaceWith("openSubscription()"))
5757
public fun open(): SubscriptionReceiveChannel<E> = openSubscription()
58+
59+
/**
60+
* Cancels reception of remaining elements from this channel. This function closes the channel with
61+
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
62+
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
63+
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
64+
*
65+
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
66+
* receive on a failed channel throw the specified [cause] exception.
67+
*/
68+
public fun cancel(cause: Throwable? = null): Boolean
5869
}
5970

6071
/**
6172
* Creates a broadcast channel with the specified buffer capacity.
6273
*
6374
* The resulting channel type depends on the specified [capacity] parameter:
64-
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel];
75+
* * when `capacity` positive, but less than [UNLIMITED] -- creates [ArrayBroadcastChannel]
76+
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
6577
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
6678
* * otherwise -- throws [IllegalArgumentException].
6779
*/

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

+2-7
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.CancellationException
20-
import kotlinx.coroutines.experimental.CoroutineScope
21-
import kotlinx.coroutines.experimental.Job
19+
import kotlinx.coroutines.experimental.*
2220
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2321
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
24-
import kotlinx.coroutines.experimental.selects.SelectClause1
25-
import kotlinx.coroutines.experimental.selects.SelectClause2
26-
import kotlinx.coroutines.experimental.selects.select
27-
import kotlinx.coroutines.experimental.yield
22+
import kotlinx.coroutines.experimental.selects.*
2823

2924
/**
3025
* Sender's interface to [Channel].

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+18-15
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.atomicfu.atomic
20-
import kotlinx.atomicfu.loop
21-
import kotlinx.coroutines.experimental.internal.Symbol
22-
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
23-
import kotlinx.coroutines.experimental.selects.SelectClause2
24-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.internal.*
21+
import kotlinx.coroutines.experimental.intrinsics.*
22+
import kotlinx.coroutines.experimental.selects.*
2523

2624
/**
2725
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -111,12 +109,12 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
111109
}
112110
}
113111

114-
override val isClosedForSend: Boolean get() = _state.value is Closed
115-
override val isFull: Boolean get() = false
112+
public override val isClosedForSend: Boolean get() = _state.value is Closed
113+
public override val isFull: Boolean get() = false
116114

117115
@Suppress("UNCHECKED_CAST")
118-
override fun openSubscription(): SubscriptionReceiveChannel<E> {
119-
val subscriber = Subscriber<E>(this)
116+
public override fun openSubscription(): SubscriptionReceiveChannel<E> {
117+
val subscriber = Subscriber(this)
120118
_state.loop { state ->
121119
when (state) {
122120
is Closed -> {
@@ -151,7 +149,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
151149
}
152150

153151
private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
154-
if (list == null) return Array<Subscriber<E>>(1) { subscriber }
152+
if (list == null) return Array(1) { subscriber }
155153
return list + subscriber
156154
}
157155

@@ -168,7 +166,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
168166
}
169167

170168
@Suppress("UNCHECKED_CAST")
171-
override fun close(cause: Throwable?): Boolean {
169+
public override fun close(cause: Throwable?): Boolean {
172170
_state.loop { state ->
173171
when (state) {
174172
is Closed -> return false
@@ -184,12 +182,17 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
184182
}
185183
}
186184

185+
/**
186+
* Closes this broadcast channel. Same as [close].
187+
*/
188+
public override fun cancel(cause: Throwable?): Boolean = close(cause)
189+
187190
/**
188191
* Sends the value to all subscribed receives and stores this value as the most recent state for
189192
* future subscribers. This implementation never suspends.
190193
* It throws exception if the channel [isClosedForSend] (see [close] for details).
191194
*/
192-
suspend override fun send(element: E) {
195+
public override suspend fun send(element: E) {
193196
offerInternal(element)?.let { throw it.sendException }
194197
}
195198

@@ -198,7 +201,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
198201
* future subscribers. This implementation always returns `true`.
199202
* It throws exception if the channel [isClosedForSend] (see [close] for details).
200203
*/
201-
override fun offer(element: E): Boolean {
204+
public override fun offer(element: E): Boolean {
202205
offerInternal(element)?.let { throw it.sendException }
203206
return true
204207
}
@@ -230,7 +233,7 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
230233
}
231234
}
232235

233-
override val onSend: SelectClause2<E, SendChannel<E>>
236+
public override val onSend: SelectClause2<E, SendChannel<E>>
234237
get() = object : SelectClause2<E, SendChannel<E>> {
235238
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
236239
registerSelectSend(select, param, block)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
2021
import kotlin.coroutines.experimental.*
2122

2223
/**
@@ -65,6 +66,13 @@ interface ProducerJob<out E> : ReceiveChannel<E>, Job {
6566
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
6667
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
6768
*
69+
* The kind of the resulting channel depends on the specified [capacity] parameter:
70+
* * when `capacity` is 0 (default) -- uses [RendezvousChannel] without a buffer;
71+
* * when `capacity` is [Channel.UNLIMITED] -- uses [LinkedListChannel] with buffer of unlimited size;
72+
* * when `capacity` is [Channel.CONFLATED] -- uses [ConflatedChannel] that conflates back-to-back sends;
73+
* * when `capacity` is positive, but less than [UNLIMITED] -- uses [ArrayChannel] with a buffer of the specified `capacity`;
74+
* * otherwise -- throws [IllegalArgumentException].
75+
*
6876
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
6977
*
7078
* @param context context of the coroutine. The default value is [DefaultDispatcher].

0 commit comments

Comments
 (0)