Skip to content

Commit f2a9aa5

Browse files
committed
Introduced ReceiveChannel.consumeAll; all operators on ReceiveChannel
fully consume the original channel (using a helper consume extension)
1 parent 32fdbea commit f2a9aa5

File tree

16 files changed

+564
-331
lines changed

16 files changed

+564
-331
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+29-1
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,11 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
398398
remove()
399399
}
400400

401+
override fun resumeSendClosed(closed: Closed<*>) {
402+
if (select.trySelect(null))
403+
select.resumeSelectCancellableWithException(closed.sendException)
404+
}
405+
401406
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
402407
}
403408

@@ -407,6 +412,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
407412
override val pollResult: Any? get() = element
408413
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
409414
override fun completeResumeSend(token: Any) { check(token === SEND_RESUMED) }
415+
override fun resumeSendClosed(closed: Closed<*>) {}
410416
}
411417
}
412418

@@ -567,6 +573,25 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
567573
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
568574
}
569575

576+
override fun consumeAll() {
577+
close(cause = null)
578+
// now cleanup send queue
579+
consumeAllInternal()
580+
}
581+
582+
// Note: this function is invoked when channel is already closed
583+
protected open fun consumeAllInternal() {
584+
val closed = closedForSend ?: error("Cannot happen")
585+
while (true) {
586+
val send = takeFirstSendOrPeekClosed() ?: error("Cannot happen")
587+
if (send is Closed<*>) {
588+
check(send === closed)
589+
return // cleaned
590+
}
591+
send.resumeSendClosed(closed)
592+
}
593+
}
594+
570595
public final override fun iterator(): ChannelIterator<E> = Itr(this)
571596

572597
// ------ registerSelectReceive ------
@@ -909,6 +934,7 @@ public interface Send {
909934
val pollResult: Any? // E | Closed
910935
fun tryResumeSend(idempotent: Any?): Any?
911936
fun completeResumeSend(token: Any)
937+
fun resumeSendClosed(closed: Closed<*>)
912938
}
913939

914940
/**
@@ -922,7 +948,7 @@ public interface ReceiveOrClosed<in E> {
922948
}
923949

924950
/**
925-
* Represents closed channel.
951+
* Represents sender for a specific element.
926952
* @suppress **This is unstable API and it is subject to change.**
927953
*/
928954
@Suppress("UNCHECKED_CAST")
@@ -932,6 +958,7 @@ public class SendElement(
932958
) : LockFreeLinkedListNode(), Send {
933959
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
934960
override fun completeResumeSend(token: Any) = cont.completeResume(token)
961+
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
935962
override fun toString(): String = "SendElement($pollResult)[$cont]"
936963
}
937964

@@ -951,6 +978,7 @@ public class Closed<in E>(
951978
override fun completeResumeSend(token: Any) { check(token === CLOSE_RESUMED) }
952979
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED
953980
override fun completeResumeReceive(token: Any) { check(token === CLOSE_RESUMED) }
981+
override fun resumeSendClosed(closed: Closed<*>) = error("Should be never invoked")
954982
override fun toString(): String = "Closed[$closeCause]"
955983
}
956984

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ class ArrayBroadcastChannel<E>(
209209
override val isBufferAlwaysFull: Boolean get() = error("Should not be used")
210210
override val isBufferFull: Boolean get() = error("Should not be used")
211211

212-
override fun close() {
212+
override fun consumeAll() {
213213
if (close(cause = null))
214214
broadcastChannel.updateHead(removeSub = this)
215215
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

+14
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,18 @@ public open class ArrayChannel<E>(
231231
send!!.completeResumeSend(token!!)
232232
return result
233233
}
234+
235+
// Note: this function is invoked when channel is already closed
236+
override fun consumeAllInternal() {
237+
// clear buffer first
238+
lock.withLock {
239+
repeat(size) {
240+
buffer[head] = 0
241+
head = (head + 1) % capacity
242+
}
243+
size = 0
244+
}
245+
// then clean all queued senders
246+
super.consumeAllInternal()
247+
}
234248
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,12 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
7676
/**
7777
* Return type for [BroadcastChannel.openSubscription] that can be used to [receive] elements from the
7878
* open subscription and to [close] it to unsubscribe.
79+
*
80+
* Note, that invocation of [consumeAll] also closes subscription.
7981
*/
8082
public interface SubscriptionReceiveChannel<out T> : ReceiveChannel<T>, Closeable {
8183
/**
82-
* Closes this subscription.
84+
* Closes this subscription. This is a synonym for [consumeAll].
8385
*/
84-
public override fun close()
86+
public override fun close() = consumeAll()
8587
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

+15-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ public interface SendChannel<in E> {
8787
/**
8888
* Closes this channel with an optional exceptional [cause].
8989
* This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
90-
* Conceptually, its sends a special "close token" over this channel. Immediately after invocation of this function
90+
* Conceptually, its sends a special "close token" over this channel.
91+
*
92+
* Immediately after invocation of this function
9193
* [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
9294
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
9395
* are received.
@@ -192,6 +194,18 @@ public interface ReceiveChannel<out E> {
192194
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
193195
*/
194196
public operator fun iterator(): ChannelIterator<E>
197+
198+
/**
199+
* Consumes all remaining elements from this channel. This function marks the channel as _consumed_
200+
* by closing it normally (unless it was already closed) and removes all buffered sent elements from it.
201+
*
202+
* Immediately after invocation of this function [isClosedForReceive] and
203+
* [isClosedForSend][SendChannel.isClosedForSend]
204+
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
205+
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
206+
* [ClosedReceiveChannelException].
207+
*/
208+
public fun consumeAll()
195209
}
196210

197211
/**

0 commit comments

Comments
 (0)