Skip to content

Commit 578e4a5

Browse files
committed
consumeAll renamed to cancel; cleaned up some channel operators;
dropped/deprecated ActorJob/ProducerJob, fixes #127
1 parent d685a3e commit 578e4a5

File tree

20 files changed

+274
-237
lines changed

20 files changed

+274
-237
lines changed

core/kotlinx-coroutines-core/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ Coroutine builder functions:
88
| ------------- | ------------- | ---------------- | ---------------
99
| [launch] | [Job] | [CoroutineScope] | Launches coroutine that does not have any result
1010
| [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result
11-
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ProducerJob][kotlinx.coroutines.experimental.channels.ProducerJob] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
12-
| [actor][kotlinx.coroutines.experimental.channels.actor] | [ActorJob][kotlinx.coroutines.experimental.channels.ActorJob] | [ActorScope][kotlinx.coroutines.experimental.channels.ActorScope] | Processes a stream of messages
11+
| [produce][kotlinx.coroutines.experimental.channels.produce] | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements
12+
| [actor][kotlinx.coroutines.experimental.channels.actor] | [SendChannel][kotlinx.coroutines.experimental.channels.SendChannel] | [ActorScope][kotlinx.coroutines.experimental.channels.ActorScope] | Processes a stream of messages
1313
| [runBlocking] | `T` | [CoroutineScope] | Blocks the thread while the coroutine runs
1414

1515
Coroutine dispatchers implementing [CoroutineDispatcher]:

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,7 @@ public open class JobSupport(active: Boolean) : Job, SelectClause0, SelectClause
898898

899899
protected open val hasCancellingState: Boolean get() = false
900900

901-
public final override fun cancel(cause: Throwable?): Boolean =
901+
public override fun cancel(cause: Throwable?): Boolean =
902902
if (hasCancellingState)
903903
makeCancelling(cause) else
904904
makeCancelled(cause)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+5-6
Original file line numberDiff line numberDiff line change
@@ -573,14 +573,13 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
573573
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
574574
}
575575

576-
override fun consumeAll() {
577-
close(cause = null)
578-
// now cleanup send queue
579-
consumeAllInternal()
580-
}
576+
override fun cancel(cause: Throwable?): Boolean =
577+
close(cause).also {
578+
cleanupSendQueueOnCancel()
579+
}
581580

582581
// Note: this function is invoked when channel is already closed
583-
protected open fun consumeAllInternal() {
582+
protected open fun cleanupSendQueueOnCancel() {
584583
val closed = closedForSend ?: error("Cannot happen")
585584
while (true) {
586585
val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Actor.kt

+5-10
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,11 @@ public interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
3737
}
3838

3939
/**
40-
* Return type for [actor] coroutine builder.
40+
* @suppress **Deprecated**: Use `SendChannel`.
4141
*/
42-
public interface ActorJob<in E> : Job, SendChannel<E> {
43-
/**
44-
* A reference to the mailbox channel that this coroutine is receiving messages from.
45-
* All the [SendChannel] functions on this interface delegate to
46-
* the channel instance returned by this function.
47-
*/
42+
@Deprecated(message = "Use `SendChannel`", replaceWith = ReplaceWith("SendChannel"))
43+
interface ActorJob<in E> : SendChannel<E> {
44+
@Deprecated(message = "Use SendChannel itself")
4845
val channel: SendChannel<E>
4946
}
5047

@@ -87,7 +84,7 @@ public fun <E> actor(
8784
capacity: Int = 0,
8885
start: CoroutineStart = CoroutineStart.DEFAULT,
8986
block: suspend ActorScope<E>.() -> Unit
90-
): ActorJob<E> {
87+
): SendChannel<E> {
9188
val newContext = newCoroutineContext(context)
9289
val channel = Channel<E>(capacity)
9390
val coroutine = if (start.isLazy)
@@ -109,8 +106,6 @@ private class LazyActorCoroutine<E>(
109106
channel: Channel<E>,
110107
private val block: suspend ActorScope<E>.() -> Unit
111108
) : ActorCoroutine<E>(parentContext, channel, active = false), SelectClause2<E, SendChannel<E>> {
112-
override val channel: Channel<E> get() = this
113-
114109
override fun onStart() {
115110
block.startCoroutineCancellable(this, this)
116111
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,10 @@ class ArrayBroadcastChannel<E>(
209209
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
210210
override val isBufferFull: Boolean get() = error("Should not be used")
211211

212-
override fun consumeAll() {
213-
if (close(cause = null))
214-
broadcastChannel.updateHead(removeSub = this)
215-
}
212+
override fun cancel(cause: Throwable?): Boolean =
213+
close(cause).also { closed ->
214+
if (closed) broadcastChannel.updateHead(removeSub = this)
215+
}
216216

217217
// returns true if subHead was updated and broadcast channel's head must be checked
218218
// this method is lock-free (it never waits on lock)

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public open class ArrayChannel<E>(
233233
}
234234

235235
// Note: this function is invoked when channel is already closed
236-
override fun consumeAllInternal() {
236+
override fun cleanupSendQueueOnCancel() {
237237
// clear buffer first
238238
lock.withLock {
239239
repeat(size) {
@@ -243,6 +243,6 @@ public open class ArrayChannel<E>(
243243
size = 0
244244
}
245245
// then clean all queued senders
246-
super.consumeAllInternal()
246+
super.cleanupSendQueueOnCancel()
247247
}
248248
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
7777
* Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
7878
* open subscription and to [close] it to unsubscribe.
7979
*
80-
* Note, that invocation of [consumeAll] also closes subscription.
80+
* Note, that invocation of [cancel] also closes subscription.
8181
*/
8282
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
8383
/**
84-
* Closes this subscription. This is a synonym for [consumeAll].
84+
* Closes this subscription. This is a synonym for [cancel].
8585
*/
86-
public override fun close() = consumeAll()
86+
public override fun close() { cancel() }
8787
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

+7-4
Original file line numberDiff line numberDiff line change
@@ -196,16 +196,19 @@ public interface ReceiveChannel<out E> {
196196
public operator fun iterator(): ChannelIterator<E>
197197

198198
/**
199-
* Consumes all remaining elements from this channel. This function marks the channel as _consumed_
200-
* by closing it normally (unless it was already closed) and removes all buffered sent elements from it.
199+
* Cancels reception of remaining elements from this channel. This function closes the channel with
200+
* the specified cause (unless it was already closed) and removes all buffered sent elements from it.
201+
* This function returns `true` if the channel was not closed previously, or `false` otherwise.
201202
*
202203
* Immediately after invocation of this function [isClosedForReceive] and
203204
* [isClosedForSend][SendChannel.isClosedForSend]
204205
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
205206
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
206-
* [ClosedReceiveChannelException].
207+
* [ClosedReceiveChannelException] if it was cancelled without a cause.
208+
* A channel that was cancelled with non-null [cause] is called a _failed_ channel. Attempts to send or
209+
* receive on a failed channel throw the specified [cause] exception.
207210
*/
208-
public fun consumeAll()
211+
public fun cancel(cause: Throwable? = null): Boolean
209212
}
210213

211214
/**

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt

+8-4
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.AbstractCoroutine
20-
import kotlinx.coroutines.experimental.JobSupport
2120
import kotlinx.coroutines.experimental.handleCoroutineException
2221
import kotlin.coroutines.experimental.CoroutineContext
2322

2423
internal open class ChannelCoroutine<E>(
2524
parentContext: CoroutineContext,
26-
open val channel: Channel<E>,
25+
channel: Channel<E>,
2726
active: Boolean
2827
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by channel {
29-
override fun afterCompletion(state: Any?, mode: Int) {
30-
val cause = (state as? JobSupport.CompletedExceptionally)?.cause
28+
val channel: Channel<E>
29+
get() = this
30+
31+
override fun onCancellation(exceptionally: CompletedExceptionally?) {
32+
val cause = exceptionally?.cause
3133
if (!channel.close(cause) && cause != null)
3234
handleCoroutineException(context, cause)
3335
}
36+
37+
override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
3438
}

0 commit comments

Comments
 (0)