Skip to content

Commit f0bbeb8

Browse files
committed
Introduce ReceiveChannel.receiveOrClosed and ReceiveChannel.onReceiveOrClosed
Fixes #330
1 parent 0b886a3 commit f0bbeb8

File tree

9 files changed

+608
-30
lines changed

9 files changed

+608
-30
lines changed

build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) {
193193
"-Xuse-experimental=kotlin.experimental.ExperimentalTypeInference",
194194
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
195195
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
196-
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi"]
196+
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi",
197+
"-XXLanguage:+InlineClasses"]
197198

198199
}
199200
}

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+119-26
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import kotlin.jvm.*
1414

1515
/**
1616
* Abstract send channel. It is a base class for all send channel implementations.
17-
*
1817
*/
1918
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
2019
/** @suppress **This is unstable API and it is subject to change.** */
@@ -580,7 +579,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
580579

581580
@Suppress("UNCHECKED_CAST")
582581
private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
583-
val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
582+
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.THROW_ON_CLOSE)
584583
while (true) {
585584
if (enqueueReceive(receive)) {
586585
cont.initCancellability() // make it properly cancellable
@@ -628,7 +627,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
628627

629628
@Suppress("UNCHECKED_CAST")
630629
private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
631-
val receive = ReceiveElement(cont, nullOnClose = true)
630+
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.NULL_ON_CLOSE)
632631
while (true) {
633632
if (enqueueReceive(receive)) {
634633
cont.initCancellability() // make it properly cancellable
@@ -651,13 +650,42 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
651650
}
652651
}
653652

653+
@Suppress("UNCHECKED_CAST")
654+
public final override suspend fun receiveOrClosed(): ReceiveResult<E> {
655+
// fast path -- try poll non-blocking
656+
val result = pollInternal()
657+
if (result !== POLL_FAILED) {
658+
return result.toResult()
659+
}
660+
// slow-path does suspend
661+
return receiveOrClosedSuspend()
662+
}
663+
664+
@Suppress("UNCHECKED_CAST")
665+
private suspend fun receiveOrClosedSuspend(): ReceiveResult<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
666+
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.RECEIVE_RESULT)
667+
while (true) {
668+
if (enqueueReceive(receive)) {
669+
cont.initCancellability()
670+
removeReceiveOnCancel(cont, receive)
671+
return@sc
672+
}
673+
674+
val result = pollInternal()
675+
if (result is Closed<*> || result !== POLL_FAILED) {
676+
cont.resume(result.toResult<E>())
677+
return@sc
678+
}
679+
}
680+
}
681+
654682
@Suppress("UNCHECKED_CAST")
655683
public final override fun poll(): E? {
656684
val result = pollInternal()
657685
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
658686
}
659687

660-
override fun cancel(): Unit {
688+
override fun cancel() {
661689
cancel(null)
662690
}
663691

@@ -712,9 +740,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
712740

713741
private inner class TryEnqueueReceiveDesc<E, R>(
714742
select: SelectInstance<R>,
715-
block: suspend (E?) -> R,
716-
nullOnClose: Boolean
717-
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
743+
block: suspend (Any?) -> R,
744+
mode: ResumeMode
745+
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, mode)) {
718746
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
719747
if (affected is Send) return ENQUEUE_FAILED
720748
return null
@@ -746,7 +774,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
746774
while (true) {
747775
if (select.isSelected) return
748776
if (isEmpty) {
749-
val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
777+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as (suspend (Any?) -> R), ResumeMode.THROW_ON_CLOSE)
750778
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
751779
when {
752780
enqueueResult === ALREADY_SELECTED -> return
@@ -780,7 +808,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
780808
while (true) {
781809
if (select.isSelected) return
782810
if (isEmpty) {
783-
val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
811+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.NULL_ON_CLOSE)
784812
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
785813
when {
786814
enqueueResult === ALREADY_SELECTED -> return
@@ -810,6 +838,43 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
810838
}
811839
}
812840

841+
override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
842+
get() = object : SelectClause1<ReceiveResult<E>> {
843+
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
844+
registerSelectReceiveOrClosed(select, block)
845+
}
846+
}
847+
848+
@Suppress("UNCHECKED_CAST")
849+
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
850+
while (true) {
851+
if (select.isSelected) return
852+
if (isEmpty) {
853+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.RECEIVE_RESULT)
854+
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
855+
when {
856+
enqueueResult === ALREADY_SELECTED -> return
857+
enqueueResult === ENQUEUE_FAILED -> {} // retry
858+
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
859+
}
860+
} else {
861+
val pollResult = pollSelectInternal(select)
862+
when {
863+
pollResult === ALREADY_SELECTED -> return
864+
pollResult === POLL_FAILED -> {} // retry
865+
pollResult is Closed<*> -> {
866+
block.startCoroutineUnintercepted(ReceiveResult.closed(pollResult.receiveException), select.completion)
867+
}
868+
else -> {
869+
// selected successfully
870+
block.startCoroutineUnintercepted(ReceiveResult.value(pollResult as E), select.completion)
871+
return
872+
}
873+
}
874+
}
875+
}
876+
}
877+
813878
// ------ protected ------
814879

815880
override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
@@ -902,18 +967,31 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
902967
}
903968

904969
private class ReceiveElement<in E>(
905-
@JvmField val cont: CancellableContinuation<E?>,
906-
@JvmField val nullOnClose: Boolean
970+
@JvmField val cont: CancellableContinuation<Any?>,
971+
@JvmField val resumeMode: ResumeMode
907972
) : Receive<E>() {
908-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
973+
@Suppress("IMPLICIT_CAST_TO_ANY")
974+
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
975+
val resumeValue = when (resumeMode) {
976+
ResumeMode.RECEIVE_RESULT -> ReceiveResult.value(value)
977+
else -> value
978+
}
979+
return cont.tryResume(resumeValue, idempotent)
980+
}
981+
909982
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
910983
override fun resumeReceiveClosed(closed: Closed<*>) {
911-
if (closed.closeCause == null && nullOnClose)
912-
cont.resume(null)
913-
else
914-
cont.resumeWithException(closed.receiveException)
984+
when (resumeMode) {
985+
ResumeMode.THROW_ON_CLOSE -> cont.resumeWithException(closed.receiveException)
986+
ResumeMode.RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
987+
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
988+
cont.resume(null)
989+
} else {
990+
cont.resumeWithException(closed.receiveException)
991+
}
992+
}
915993
}
916-
override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
994+
override fun toString(): String = "ReceiveElement[$cont,mode=$resumeMode]"
917995
}
918996

919997
private class ReceiveHasNext<E>(
@@ -957,26 +1035,27 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
9571035

9581036
private inner class ReceiveSelect<R, in E>(
9591037
@JvmField val select: SelectInstance<R>,
960-
@JvmField val block: suspend (E?) -> R,
961-
@JvmField val nullOnClose: Boolean
1038+
@JvmField val block: suspend (Any?) -> R,
1039+
@JvmField val mode: ResumeMode
9621040
) : Receive<E>(), DisposableHandle {
9631041
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
9641042
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
9651043

9661044
@Suppress("UNCHECKED_CAST")
9671045
override fun completeResumeReceive(token: Any) {
9681046
val value: E = (if (token === NULL_VALUE) null else token) as E
969-
block.startCoroutine(value, select.completion)
1047+
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ReceiveResult.value(value) else value, select.completion)
9701048
}
9711049

9721050
override fun resumeReceiveClosed(closed: Closed<*>) {
973-
if (select.trySelect(null)) {
974-
if (closed.closeCause == null && nullOnClose) {
1051+
if (!select.trySelect(null)) return
1052+
1053+
when (mode) {
1054+
ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
1055+
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ReceiveResult.closed<R>(closed.receiveException), select.completion)
1056+
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
9751057
block.startCoroutine(null, select.completion)
9761058
} else {
977-
// even though we are dispatching coroutine to process channel close on receive,
978-
// which is an atomically cancellable suspending function,
979-
// close is a final state, so we can use a cancellable resume mode
9801059
select.resumeSelectCancellableWithException(closed.receiveException)
9811060
}
9821061
}
@@ -991,7 +1070,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
9911070
onReceiveDequeued() // notify cancellation of receive
9921071
}
9931072

994-
override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
1073+
override fun toString(): String = "ReceiveSelect[$select,mode=$mode]"
9951074
}
9961075

9971076
private class IdempotentTokenValue<out E>(
@@ -1083,3 +1162,17 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
10831162
override val offerResult get() = OFFER_SUCCESS
10841163
abstract fun resumeReceiveClosed(closed: Closed<*>)
10851164
}
1165+
1166+
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
1167+
private inline fun <E> Any?.toResult(): ReceiveResult<E> =
1168+
if (this is Closed<*>) ReceiveResult.closed(receiveException) else ReceiveResult.value(this as E)
1169+
1170+
@Suppress("NOTHING_TO_INLINE")
1171+
private inline fun <E> Closed<*>.toResult(): ReceiveResult<E> = ReceiveResult.closed(receiveException)
1172+
1173+
// Marker for receive, receiveOrNull and receiveOrClosed
1174+
private enum class ResumeMode {
1175+
THROW_ON_CLOSE,
1176+
NULL_ON_CLOSE,
1177+
RECEIVE_RESULT
1178+
}

common/kotlinx-coroutines-core-common/src/channels/Channel.kt

+102-2
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public interface ReceiveChannel<out E> {
217217

218218
/**
219219
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
220-
* is received from the channel or selects with `null` if if the channel
220+
* is received from the channel or selects with `null` if the channel
221221
* [isClosedForReceive] without cause. The [select] invocation fails with
222222
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
223223
*
@@ -226,6 +226,37 @@ public interface ReceiveChannel<out E> {
226226
@ExperimentalCoroutinesApi
227227
public val onReceiveOrNull: SelectClause1<E?>
228228

229+
/**
230+
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty].
231+
* This method returns [ReceiveResult] with a value if element was successfully retrieved from the channel
232+
* or [ReceiveResult] with close cause if channel was closed.
233+
*
234+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
235+
* function is suspended, this function immediately resumes with [CancellationException].
236+
*
237+
* *Cancellation of suspended receive is atomic* -- when this function
238+
* throws [CancellationException] it means that the element was not retrieved from this channel.
239+
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
240+
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
241+
* was already resumed and the continuation was posted for execution to the thread's queue.
242+
*
243+
* Note, that this function does not check for cancellation when it is not suspended.
244+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
245+
*
246+
* This function can be used in [select] invocation with [onReceiveOrClosed] clause.
247+
* Use [poll] to try receiving from this channel without waiting.
248+
*/
249+
@ExperimentalCoroutinesApi
250+
public suspend fun receiveOrClosed(): ReceiveResult<E>
251+
252+
/**
253+
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ReceiveResult] with a value
254+
* that is received from the channel or selects with [ReceiveResult] with a close cause if the channel
255+
* [isClosedForReceive].
256+
*/
257+
@ExperimentalCoroutinesApi
258+
public val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
259+
229260
/**
230261
* Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
231262
* or is [isClosedForReceive] without cause.
@@ -250,7 +281,7 @@ public interface ReceiveChannel<out E> {
250281
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
251282
* [ClosedReceiveChannelException].
252283
*/
253-
public fun cancel(): Unit
284+
public fun cancel()
254285

255286
/**
256287
* @suppress
@@ -268,6 +299,75 @@ public interface ReceiveChannel<out E> {
268299
public fun cancel(cause: Throwable? = null): Boolean
269300
}
270301

302+
/**
303+
* A discriminated union of [ReceiveChannel.receiveOrClosed] result,
304+
* that encapsulates either successfully received element of type [T] from the channel or a close cause.
305+
*/
306+
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
307+
public inline class ReceiveResult<out T>
308+
internal constructor(private val holder: Any?) {
309+
/**
310+
* Returns `true` if this instance represents received element.
311+
* In this case [isClosed] returns `false`.
312+
*/
313+
public val isValue: Boolean get() = holder !is Closed
314+
315+
/**
316+
* Returns `true` if this instance represents close cause.
317+
* In this case [isValue] returns `false`.
318+
*/
319+
public val isClosed: Boolean get() = holder is Closed
320+
321+
/**
322+
* Returns received value if this instance represents received value or throws [IllegalStateException] otherwise.
323+
*/
324+
@Suppress("UNCHECKED_CAST")
325+
public val value: T
326+
get() = if (isClosed) throw IllegalStateException() else holder as T
327+
328+
/**
329+
* Returns received value if this element represents received value or `null` otherwise.
330+
*/
331+
@Suppress("UNCHECKED_CAST")
332+
public val valueOrNull: T?
333+
get() = if (isClosed) null else holder as T
334+
335+
@Suppress("UNCHECKED_CAST")
336+
public val valueOrThrow: T
337+
get() = if (isClosed) throw closeCause else holder as T
338+
339+
/**
340+
* Returns close cause of the channel if this instance represents close cause or throws [IllegalStateException] otherwise.
341+
*/
342+
@Suppress("UNCHECKED_CAST")
343+
public val closeCause: Throwable get() = if (isClosed) (holder as Closed).exception else error("ReceiveResult is not closed")
344+
345+
/**
346+
* @suppress
347+
*/
348+
public override fun toString(): String =
349+
when (holder) {
350+
is Closed -> holder.toString()
351+
else -> "Value($holder)"
352+
}
353+
354+
internal class Closed(@JvmField val exception: Throwable) {
355+
override fun equals(other: Any?): Boolean = other is Closed && exception == other.exception
356+
override fun hashCode(): Int = exception.hashCode()
357+
override fun toString(): String = "Closed($exception)"
358+
}
359+
360+
internal companion object {
361+
@Suppress("NOTHING_TO_INLINE")
362+
internal inline fun <E> value(value: E): ReceiveResult<E> =
363+
ReceiveResult(value)
364+
365+
@Suppress("NOTHING_TO_INLINE")
366+
internal inline fun <E> closed(cause: Throwable): ReceiveResult<E> =
367+
ReceiveResult(Closed(cause))
368+
}
369+
}
370+
271371
/**
272372
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
273373
* from concurrent coroutines.

0 commit comments

Comments
 (0)