@@ -692,100 +692,71 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
692
692
693
693
final override val onReceive: SelectClause1 <E >
694
694
get() = object : SelectClause1 <E > {
695
+ @Suppress(" UNCHECKED_CAST" )
695
696
override fun <R > registerSelectClause1 (select : SelectInstance <R >, block : suspend (E ) -> R ) {
696
- registerSelectReceive (select, block)
697
+ registerSelectReceiveMode (select, RECEIVE_THROWS_ON_CLOSE , block as suspend ( Any? ) -> R )
697
698
}
698
699
}
699
700
700
- @Suppress(" UNCHECKED_CAST" )
701
- private fun <R > registerSelectReceive (select : SelectInstance <R >, block : suspend (E ) -> R ) {
702
- while (true ) {
703
- if (select.isSelected) return
704
- if (isEmpty) {
705
- if (enqueueReceiveSelect(select, block as suspend (Any? ) -> R , RECEIVE_THROWS_ON_CLOSE )) return
706
- } else {
707
- val pollResult = pollSelectInternal(select)
708
- when {
709
- pollResult == = ALREADY_SELECTED -> return
710
- pollResult == = POLL_FAILED -> {} // retry
711
- pollResult == = RETRY_ATOMIC -> {} // retry
712
- pollResult is Closed <* > -> throw recoverStackTrace(pollResult.receiveException)
713
- else -> {
714
- block.startCoroutineUnintercepted(pollResult as E , select.completion)
715
- return
716
- }
717
- }
718
- }
719
- }
720
- }
721
-
722
701
final override val onReceiveOrNull: SelectClause1 <E ?>
723
702
get() = object : SelectClause1 <E ?> {
703
+ @Suppress(" UNCHECKED_CAST" )
724
704
override fun <R > registerSelectClause1 (select : SelectInstance <R >, block : suspend (E ? ) -> R ) {
725
- registerSelectReceiveOrNull(select, block)
726
- }
727
- }
728
-
729
- @Suppress(" UNCHECKED_CAST" )
730
- private fun <R > registerSelectReceiveOrNull (select : SelectInstance <R >, block : suspend (E ? ) -> R ) {
731
- while (true ) {
732
- if (select.isSelected) return
733
- if (isEmpty) {
734
- if (enqueueReceiveSelect(select, block as suspend (Any? ) -> R , RECEIVE_NULL_ON_CLOSE )) return
735
- } else {
736
- val pollResult = pollSelectInternal(select)
737
- when {
738
- pollResult == = ALREADY_SELECTED -> return
739
- pollResult == = POLL_FAILED -> {} // retry
740
- pollResult == = RETRY_ATOMIC -> {} // retry
741
- pollResult is Closed <* > -> {
742
- if (pollResult.closeCause == null ) {
743
- if (select.trySelect())
744
- block.startCoroutineUnintercepted(null , select.completion)
745
- return
746
- } else {
747
- throw recoverStackTrace(pollResult.closeCause)
748
- }
749
- }
750
- else -> {
751
- // selected successfully, pollSelectInternal is responsible for the select
752
- block.startCoroutineUnintercepted(pollResult as E , select.completion)
753
- return
754
- }
755
- }
705
+ registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE , block as suspend (Any? ) -> R )
756
706
}
757
707
}
758
- }
759
708
760
- override val onReceiveOrClosed: SelectClause1 <ValueOrClosed <E >>
709
+ final override val onReceiveOrClosed: SelectClause1 <ValueOrClosed <E >>
761
710
get() = object : SelectClause1 <ValueOrClosed <E >> {
711
+ @Suppress(" UNCHECKED_CAST" )
762
712
override fun <R > registerSelectClause1 (select : SelectInstance <R >, block : suspend (ValueOrClosed <E >) -> R ) {
763
- registerSelectReceiveOrClosed (select, block)
713
+ registerSelectReceiveMode (select, RECEIVE_RESULT , block as suspend ( Any? ) -> R )
764
714
}
765
715
}
766
716
767
- @Suppress(" UNCHECKED_CAST" )
768
- private fun <R > registerSelectReceiveOrClosed (select : SelectInstance <R >, block : suspend (ValueOrClosed <E >) -> R ) {
717
+ private fun <R > registerSelectReceiveMode (select : SelectInstance <R >, receiveMode : Int , block : suspend (Any? ) -> R ) {
769
718
while (true ) {
770
719
if (select.isSelected) return
771
720
if (isEmpty) {
772
- if (enqueueReceiveSelect(select, block as suspend ( Any? ) -> R , RECEIVE_RESULT )) return
721
+ if (enqueueReceiveSelect(select, block, receiveMode )) return
773
722
} else {
774
723
val pollResult = pollSelectInternal(select)
775
724
when {
776
725
pollResult == = ALREADY_SELECTED -> return
777
726
pollResult == = POLL_FAILED -> {} // retry
778
727
pollResult == = RETRY_ATOMIC -> {} // retry
779
- pollResult is Closed <* > -> {
780
- if (select.trySelect())
781
- block.startCoroutineUnintercepted(ValueOrClosed .closed(pollResult.closeCause), select.completion)
782
- return
728
+ else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
729
+ }
730
+ }
731
+ }
732
+ }
733
+
734
+ private fun <R > (suspend (Any? ) -> R ).tryStartBlockUnintercepted(select : SelectInstance <R >, receiveMode : Int , value : Any? ) {
735
+ when (value) {
736
+ is Closed <* > -> {
737
+ when (receiveMode) {
738
+ RECEIVE_THROWS_ON_CLOSE -> {
739
+ throw recoverStackTrace(value.receiveException)
783
740
}
784
- else -> {
785
- // selected successfully, pollSelectInternal is responsible for the select
786
- block.startCoroutineUnintercepted(ValueOrClosed .value(pollResult as E ), select.completion)
787
- return
741
+ RECEIVE_RESULT -> {
742
+ if (! select.trySelect()) return
743
+ startCoroutineUnintercepted(ValueOrClosed .closed<Any >(value.closeCause), select.completion)
788
744
}
745
+ RECEIVE_NULL_ON_CLOSE -> {
746
+ if (value.closeCause == null ) {
747
+ if (! select.trySelect()) return
748
+ startCoroutineUnintercepted(null , select.completion)
749
+ } else {
750
+ throw recoverStackTrace(value.receiveException)
751
+ }
752
+ }
753
+ }
754
+ }
755
+ else -> {
756
+ if (receiveMode == RECEIVE_RESULT ) {
757
+ startCoroutineUnintercepted(value.toResult<Any >(), select.completion)
758
+ } else {
759
+ startCoroutineUnintercepted(value, select.completion)
789
760
}
790
761
}
791
762
}
0 commit comments