Skip to content

Commit f8a6880

Browse files
committed
Simplify AbstractChannel select implementations
1 parent f0bbeb8 commit f8a6880

File tree

1 file changed

+19
-22
lines changed

1 file changed

+19
-22
lines changed

common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt

+19-22
Original file line numberDiff line numberDiff line change
@@ -774,13 +774,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
774774
while (true) {
775775
if (select.isSelected) return
776776
if (isEmpty) {
777-
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as (suspend (Any?) -> R), ResumeMode.THROW_ON_CLOSE)
778-
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
779-
when {
780-
enqueueResult === ALREADY_SELECTED -> return
781-
enqueueResult === ENQUEUE_FAILED -> {} // retry
782-
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
783-
}
777+
if (registerEnqueueDesc(select, block, ResumeMode.THROW_ON_CLOSE)) return
784778
} else {
785779
val pollResult = pollSelectInternal(select)
786780
when {
@@ -808,13 +802,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
808802
while (true) {
809803
if (select.isSelected) return
810804
if (isEmpty) {
811-
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.NULL_ON_CLOSE)
812-
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
813-
when {
814-
enqueueResult === ALREADY_SELECTED -> return
815-
enqueueResult === ENQUEUE_FAILED -> {} // retry
816-
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
817-
}
805+
if (registerEnqueueDesc(select, block, ResumeMode.NULL_ON_CLOSE)) return
818806
} else {
819807
val pollResult = pollSelectInternal(select)
820808
when {
@@ -825,8 +813,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
825813
if (select.trySelect(null))
826814
block.startCoroutineUnintercepted(null, select.completion)
827815
return
828-
} else
816+
} else {
829817
throw pollResult.closeCause
818+
}
830819
}
831820
else -> {
832821
// selected successfully
@@ -850,13 +839,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
850839
while (true) {
851840
if (select.isSelected) return
852841
if (isEmpty) {
853-
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.RECEIVE_RESULT)
854-
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
855-
when {
856-
enqueueResult === ALREADY_SELECTED -> return
857-
enqueueResult === ENQUEUE_FAILED -> {} // retry
858-
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
859-
}
842+
if (registerEnqueueDesc(select, block, ResumeMode.RECEIVE_RESULT)) return
860843
} else {
861844
val pollResult = pollSelectInternal(select)
862845
when {
@@ -875,6 +858,20 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
875858
}
876859
}
877860

861+
private fun <R, E> registerEnqueueDesc(select: SelectInstance<R>, block: suspend (E) -> R,
862+
mode: ResumeMode): Boolean {
863+
@Suppress("UNCHECKED_CAST")
864+
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, mode)
865+
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
866+
return when {
867+
enqueueResult === ALREADY_SELECTED -> true
868+
enqueueResult === ENQUEUE_FAILED -> {
869+
false // retry
870+
}
871+
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
872+
}
873+
}
874+
878875
// ------ protected ------
879876

880877
override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =

0 commit comments

Comments
 (0)