Skip to content

Commit 3a86f65

Browse files
committed
Introduce ReceiveChannel.receiveOrClosed and ReceiveChannel.onReceiveOrClosed
Fixes #330
1 parent 8a22c54 commit 3a86f65

File tree

9 files changed

+608
-30
lines changed

9 files changed

+608
-30
lines changed

build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) {
185185
"-Xuse-experimental=kotlin.experimental.ExperimentalTypeInference",
186186
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
187187
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
188-
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi"]
188+
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi",
189+
"-XXLanguage:+InlineClasses"]
189190

190191
}
191192
}

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+119-26
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import kotlin.jvm.*
1414

1515
/**
1616
* Abstract send channel. It is a base class for all send channel implementations.
17-
*
1817
*/
1918
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
2019
/** @suppress **This is unstable API and it is subject to change.** */
@@ -580,7 +579,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
580579

581580
@Suppress("UNCHECKED_CAST")
582581
private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
583-
val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
582+
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.THROW_ON_CLOSE)
584583
while (true) {
585584
if (enqueueReceive(receive)) {
586585
cont.initCancellability() // make it properly cancellable
@@ -628,7 +627,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
628627

629628
@Suppress("UNCHECKED_CAST")
630629
private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
631-
val receive = ReceiveElement(cont, nullOnClose = true)
630+
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.NULL_ON_CLOSE)
632631
while (true) {
633632
if (enqueueReceive(receive)) {
634633
cont.initCancellability() // make it properly cancellable
@@ -651,13 +650,42 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
651650
}
652651
}
653652

653+
@Suppress("UNCHECKED_CAST")
654+
public final override suspend fun receiveOrClosed(): ReceiveResult<E> {
655+
// fast path -- try poll non-blocking
656+
val result = pollInternal()
657+
if (result !== POLL_FAILED) {
658+
return result.toResult()
659+
}
660+
// slow-path does suspend
661+
return receiveOrClosedSuspend()
662+
}
663+
664+
@Suppress("UNCHECKED_CAST")
665+
private suspend fun receiveOrClosedSuspend(): ReceiveResult<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
666+
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.RECEIVE_RESULT)
667+
while (true) {
668+
if (enqueueReceive(receive)) {
669+
cont.initCancellability()
670+
removeReceiveOnCancel(cont, receive)
671+
return@sc
672+
}
673+
674+
val result = pollInternal()
675+
if (result is Closed<*> || result !== POLL_FAILED) {
676+
cont.resume(result.toResult<E>())
677+
return@sc
678+
}
679+
}
680+
}
681+
654682
@Suppress("UNCHECKED_CAST")
655683
public final override fun poll(): E? {
656684
val result = pollInternal()
657685
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
658686
}
659687

660-
override fun cancel(): Unit {
688+
override fun cancel() {
661689
cancel(null)
662690
}
663691

@@ -712,9 +740,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
712740

713741
private inner class TryEnqueueReceiveDesc<E, R>(
714742
select: SelectInstance<R>,
715-
block: suspend (E?) -> R,
716-
nullOnClose: Boolean
717-
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
743+
block: suspend (Any?) -> R,
744+
mode: ResumeMode
745+
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, mode)) {
718746
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
719747
if (affected is Send) return ENQUEUE_FAILED
720748
return null
@@ -746,7 +774,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
746774
while (true) {
747775
if (select.isSelected) return
748776
if (isEmpty) {
749-
val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
777+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as (suspend (Any?) -> R), ResumeMode.THROW_ON_CLOSE)
750778
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
751779
when {
752780
enqueueResult === ALREADY_SELECTED -> return
@@ -780,7 +808,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
780808
while (true) {
781809
if (select.isSelected) return
782810
if (isEmpty) {
783-
val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
811+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.NULL_ON_CLOSE)
784812
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
785813
when {
786814
enqueueResult === ALREADY_SELECTED -> return
@@ -810,6 +838,43 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
810838
}
811839
}
812840

841+
override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
842+
get() = object : SelectClause1<ReceiveResult<E>> {
843+
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
844+
registerSelectReceiveOrClosed(select, block)
845+
}
846+
}
847+
848+
@Suppress("UNCHECKED_CAST")
849+
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
850+
while (true) {
851+
if (select.isSelected) return
852+
if (isEmpty) {
853+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.RECEIVE_RESULT)
854+
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
855+
when {
856+
enqueueResult === ALREADY_SELECTED -> return
857+
enqueueResult === ENQUEUE_FAILED -> {} // retry
858+
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
859+
}
860+
} else {
861+
val pollResult = pollSelectInternal(select)
862+
when {
863+
pollResult === ALREADY_SELECTED -> return
864+
pollResult === POLL_FAILED -> {} // retry
865+
pollResult is Closed<*> -> {
866+
block.startCoroutineUnintercepted(ReceiveResult.closed(pollResult.receiveException), select.completion)
867+
}
868+
else -> {
869+
// selected successfully
870+
block.startCoroutineUnintercepted(ReceiveResult.value(pollResult as E), select.completion)
871+
return
872+
}
873+
}
874+
}
875+
}
876+
}
877+
813878
// ------ protected ------
814879

815880
override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
@@ -902,18 +967,31 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
902967
}
903968

904969
private class ReceiveElement<in E>(
905-
@JvmField val cont: CancellableContinuation<E?>,
906-
@JvmField val nullOnClose: Boolean
970+
@JvmField val cont: CancellableContinuation<Any?>,
971+
@JvmField val resumeMode: ResumeMode
907972
) : Receive<E>() {
908-
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
973+
@Suppress("IMPLICIT_CAST_TO_ANY")
974+
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
975+
val resumeValue = when (resumeMode) {
976+
ResumeMode.RECEIVE_RESULT -> ReceiveResult.value(value)
977+
else -> value
978+
}
979+
return cont.tryResume(resumeValue, idempotent)
980+
}
981+
909982
override fun completeResumeReceive(token: Any) = cont.completeResume(token)
910983
override fun resumeReceiveClosed(closed: Closed<*>) {
911-
if (closed.closeCause == null && nullOnClose)
912-
cont.resume(null)
913-
else
914-
cont.resumeWithException(closed.receiveException)
984+
when (resumeMode) {
985+
ResumeMode.THROW_ON_CLOSE -> cont.resumeWithException(closed.receiveException)
986+
ResumeMode.RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
987+
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
988+
cont.resume(null)
989+
} else {
990+
cont.resumeWithException(closed.receiveException)
991+
}
992+
}
915993
}
916-
override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
994+
override fun toString(): String = "ReceiveElement[$cont,mode=$resumeMode]"
917995
}
918996

919997
private class ReceiveHasNext<E>(
@@ -957,26 +1035,27 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
9571035

9581036
private inner class ReceiveSelect<R, in E>(
9591037
@JvmField val select: SelectInstance<R>,
960-
@JvmField val block: suspend (E?) -> R,
961-
@JvmField val nullOnClose: Boolean
1038+
@JvmField val block: suspend (Any?) -> R,
1039+
@JvmField val mode: ResumeMode
9621040
) : Receive<E>(), DisposableHandle {
9631041
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
9641042
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
9651043

9661044
@Suppress("UNCHECKED_CAST")
9671045
override fun completeResumeReceive(token: Any) {
9681046
val value: E = (if (token === NULL_VALUE) null else token) as E
969-
block.startCoroutine(value, select.completion)
1047+
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ReceiveResult.value(value) else value, select.completion)
9701048
}
9711049

9721050
override fun resumeReceiveClosed(closed: Closed<*>) {
973-
if (select.trySelect(null)) {
974-
if (closed.closeCause == null && nullOnClose) {
1051+
if (!select.trySelect(null)) return
1052+
1053+
when (mode) {
1054+
ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
1055+
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ReceiveResult.closed<R>(closed.receiveException), select.completion)
1056+
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
9751057
block.startCoroutine(null, select.completion)
9761058
} else {
977-
// even though we are dispatching coroutine to process channel close on receive,
978-
// which is an atomically cancellable suspending function,
979-
// close is a final state, so we can use a cancellable resume mode
9801059
select.resumeSelectCancellableWithException(closed.receiveException)
9811060
}
9821061
}
@@ -991,7 +1070,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
9911070
onReceiveDequeued() // notify cancellation of receive
9921071
}
9931072

994-
override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
1073+
override fun toString(): String = "ReceiveSelect[$select,mode=$mode]"
9951074
}
9961075

9971076
private class IdempotentTokenValue<out E>(
@@ -1083,3 +1162,17 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
10831162
override val offerResult get() = OFFER_SUCCESS
10841163
abstract fun resumeReceiveClosed(closed: Closed<*>)
10851164
}
1165+
1166+
@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
1167+
private inline fun <E> Any?.toResult(): ReceiveResult<E> =
1168+
if (this is Closed<*>) ReceiveResult.closed(receiveException) else ReceiveResult.value(this as E)
1169+
1170+
@Suppress("NOTHING_TO_INLINE")
1171+
private inline fun <E> Closed<*>.toResult(): ReceiveResult<E> = ReceiveResult.closed(receiveException)
1172+
1173+
// Marker for receive, receiveOrNull and receiveOrClosed
1174+
private enum class ResumeMode {
1175+
THROW_ON_CLOSE,
1176+
NULL_ON_CLOSE,
1177+
RECEIVE_RESULT
1178+
}

common/kotlinx-coroutines-core-common/src/channels/Channel.kt

+102-2
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public interface ReceiveChannel<out E> {
218218

219219
/**
220220
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
221-
* is received from the channel or selects with `null` if if the channel
221+
* is received from the channel or selects with `null` if the channel
222222
* [isClosedForReceive] without cause. The [select] invocation fails with
223223
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
224224
*
@@ -227,6 +227,37 @@ public interface ReceiveChannel<out E> {
227227
@ExperimentalCoroutinesApi
228228
public val onReceiveOrNull: SelectClause1<E?>
229229

230+
/**
231+
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty].
232+
* This method returns [ReceiveResult] with a value if element was successfully retrieved from the channel
233+
* or [ReceiveResult] with close cause if channel was closed.
234+
*
235+
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
236+
* function is suspended, this function immediately resumes with [CancellationException].
237+
*
238+
* *Cancellation of suspended receive is atomic* -- when this function
239+
* throws [CancellationException] it means that the element was not retrieved from this channel.
240+
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
241+
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
242+
* was already resumed and the continuation was posted for execution to the thread's queue.
243+
*
244+
* Note, that this function does not check for cancellation when it is not suspended.
245+
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
246+
*
247+
* This function can be used in [select] invocation with [onReceiveOrClosed] clause.
248+
* Use [poll] to try receiving from this channel without waiting.
249+
*/
250+
@ExperimentalCoroutinesApi
251+
public suspend fun receiveOrClosed(): ReceiveResult<E>
252+
253+
/**
254+
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ReceiveResult] with a value
255+
* that is received from the channel or selects with [ReceiveResult] with a close cause if the channel
256+
* [isClosedForReceive].
257+
*/
258+
@ExperimentalCoroutinesApi
259+
public val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
260+
230261
/**
231262
* Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
232263
* or is [isClosedForReceive] without cause.
@@ -251,7 +282,7 @@ public interface ReceiveChannel<out E> {
251282
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
252283
* [ClosedReceiveChannelException].
253284
*/
254-
public fun cancel(): Unit
285+
public fun cancel()
255286

256287
/**
257288
* @suppress
@@ -269,6 +300,75 @@ public interface ReceiveChannel<out E> {
269300
public fun cancel(cause: Throwable? = null): Boolean
270301
}
271302

303+
/**
304+
* A discriminated union of [ReceiveChannel.receiveOrClosed] result,
305+
* that encapsulates either successfully received element of type [T] from the channel or a close cause.
306+
*/
307+
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
308+
public inline class ReceiveResult<out T>
309+
internal constructor(private val holder: Any?) {
310+
/**
311+
* Returns `true` if this instance represents received element.
312+
* In this case [isClosed] returns `false`.
313+
*/
314+
public val isValue: Boolean get() = holder !is Closed
315+
316+
/**
317+
* Returns `true` if this instance represents close cause.
318+
* In this case [isValue] returns `false`.
319+
*/
320+
public val isClosed: Boolean get() = holder is Closed
321+
322+
/**
323+
* Returns received value if this instance represents received value or throws [IllegalStateException] otherwise.
324+
*/
325+
@Suppress("UNCHECKED_CAST")
326+
public val value: T
327+
get() = if (isClosed) throw IllegalStateException() else holder as T
328+
329+
/**
330+
* Returns received value if this element represents received value or `null` otherwise.
331+
*/
332+
@Suppress("UNCHECKED_CAST")
333+
public val valueOrNull: T?
334+
get() = if (isClosed) null else holder as T
335+
336+
@Suppress("UNCHECKED_CAST")
337+
public val valueOrThrow: T
338+
get() = if (isClosed) throw closeCause else holder as T
339+
340+
/**
341+
* Returns close cause of the channel if this instance represents close cause or throws [IllegalStateException] otherwise.
342+
*/
343+
@Suppress("UNCHECKED_CAST")
344+
public val closeCause: Throwable get() = if (isClosed) (holder as Closed).exception else error("ReceiveResult is not closed")
345+
346+
/**
347+
* @suppress
348+
*/
349+
public override fun toString(): String =
350+
when (holder) {
351+
is Closed -> holder.toString()
352+
else -> "Value($holder)"
353+
}
354+
355+
internal class Closed(@JvmField val exception: Throwable) {
356+
override fun equals(other: Any?): Boolean = other is Closed && exception == other.exception
357+
override fun hashCode(): Int = exception.hashCode()
358+
override fun toString(): String = "Closed($exception)"
359+
}
360+
361+
internal companion object {
362+
@Suppress("NOTHING_TO_INLINE")
363+
internal inline fun <E> value(value: E): ReceiveResult<E> =
364+
ReceiveResult(value)
365+
366+
@Suppress("NOTHING_TO_INLINE")
367+
internal inline fun <E> closed(cause: Throwable): ReceiveResult<E> =
368+
ReceiveResult(Closed(cause))
369+
}
370+
}
371+
272372
/**
273373
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
274374
* from concurrent coroutines.

0 commit comments

Comments
 (0)