@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
56
56
57
57
/* *
58
58
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59
- * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59
+ * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
60
60
* @suppress **This is unstable API and it is subject to change.**
61
61
*/
62
62
protected open fun offerSelectInternal (element : E , select : SelectInstance <* >): Any {
@@ -345,10 +345,11 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
345
345
else -> null
346
346
}
347
347
348
- override fun validatePrepared (node : ReceiveOrClosed <E >): Boolean {
349
- val token = node.tryResumeReceive(element, idempotent = this ) ? : return false
348
+ override fun validatePrepared (node : ReceiveOrClosed <E >): Any? {
349
+ val token = node.tryResumeReceive(element, idempotent = this ) ? : return REMOVE_PREPARED
350
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
350
351
resumeToken = token
351
- return true
352
+ return null
352
353
}
353
354
}
354
355
@@ -384,6 +385,9 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
384
385
when {
385
386
offerResult == = ALREADY_SELECTED -> return
386
387
offerResult == = OFFER_FAILED -> {} // retry
388
+ offerResult == = RETRY_ATOMIC -> {
389
+ // println("registerSelectSend($element): RETRY")
390
+ } // retry
387
391
offerResult == = OFFER_SUCCESS -> {
388
392
block.startCoroutineUnintercepted(receiver = this , completion = select.completion)
389
393
return
@@ -436,20 +440,25 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
436
440
@JvmField val select : SelectInstance <R >,
437
441
@JvmField val block : suspend (SendChannel <E >) -> R
438
442
) : Send(), DisposableHandle {
439
- override fun tryResumeSend (idempotent : Any? ): Any? =
440
- if (select.trySelect(idempotent)) SELECT_STARTED else null
443
+ override fun tryResumeSend (idempotent : Any? ): Any? {
444
+ val result = select.trySelectIdempotent(idempotent)
445
+ // println("SendSelect($pollResult) tryResumeSend=$result")
446
+ return result
447
+ }
441
448
442
449
override fun completeResumeSend (token : Any ) {
443
450
assert { token == = SELECT_STARTED }
451
+ // println("SendSelect($pollResult) completeResumeSend")
444
452
block.startCoroutine(receiver = channel, completion = select.completion)
445
453
}
446
454
447
455
override fun dispose () { // invoked on select completion
456
+ // println("SendSelect($pollResult) dispose")
448
457
remove()
449
458
}
450
459
451
460
override fun resumeSendClosed (closed : Closed <* >) {
452
- if (select.trySelect(null ))
461
+ if (select.trySelect())
453
462
select.resumeSelectCancellableWithException(closed.sendException)
454
463
}
455
464
@@ -504,7 +513,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
504
513
505
514
/* *
506
515
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
507
- * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
516
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
508
517
* @suppress **This is unstable API and it is subject to change.**
509
518
*/
510
519
protected open fun pollSelectInternal (select : SelectInstance <* >): Any? {
@@ -644,7 +653,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
644
653
* @suppress **This is unstable API and it is subject to change.**
645
654
*/
646
655
protected class TryPollDesc <E >(queue : LockFreeLinkedListHead ) : RemoveFirstDesc<Send>(queue) {
647
- @JvmField var resumeToken: Any? = null
656
+ @JvmField var resumeToken: Any? = null // can be RETRY_ATOMIC
648
657
@JvmField var pollResult: E ? = null
649
658
650
659
override fun failure (affected : LockFreeLinkedListNode ): Any? = when (affected) {
@@ -654,11 +663,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
654
663
}
655
664
656
665
@Suppress(" UNCHECKED_CAST" )
657
- override fun validatePrepared (node : Send ): Boolean {
658
- val token = node.tryResumeSend(idempotent = this ) ? : return false
666
+ override fun validatePrepared (node : Send ): Any? {
667
+ val token = node.tryResumeSend(idempotent = this ) ? : return REMOVE_PREPARED
668
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
659
669
resumeToken = token
660
670
pollResult = node.pollResult as E
661
- return true
671
+ return null
662
672
}
663
673
}
664
674
@@ -680,6 +690,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
680
690
when {
681
691
pollResult == = ALREADY_SELECTED -> return
682
692
pollResult == = POLL_FAILED -> {} // retry
693
+ pollResult == = RETRY_ATOMIC -> {
694
+ // println("registerSelectReceive(): RETRY")
695
+ } // retry
683
696
pollResult is Closed <* > -> throw recoverStackTrace(pollResult.receiveException)
684
697
else -> {
685
698
block.startCoroutineUnintercepted(pollResult as E , select.completion)
@@ -708,9 +721,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
708
721
when {
709
722
pollResult == = ALREADY_SELECTED -> return
710
723
pollResult == = POLL_FAILED -> {} // retry
724
+ pollResult == = RETRY_ATOMIC -> {} // retry
711
725
pollResult is Closed <* > -> {
712
726
if (pollResult.closeCause == null ) {
713
- if (select.trySelect(null ))
727
+ if (select.trySelect())
714
728
block.startCoroutineUnintercepted(null , select.completion)
715
729
return
716
730
} else {
@@ -745,6 +759,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
745
759
when {
746
760
pollResult == = ALREADY_SELECTED -> return
747
761
pollResult == = POLL_FAILED -> {} // retry
762
+ pollResult == = RETRY_ATOMIC -> {} // retry
748
763
pollResult is Closed <* > -> {
749
764
block.startCoroutineUnintercepted(ValueOrClosed .closed(pollResult.closeCause), select.completion)
750
765
}
@@ -927,8 +942,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
927
942
@JvmField val block : suspend (Any? ) -> R ,
928
943
@JvmField val receiveMode : Int
929
944
) : Receive<E>(), DisposableHandle {
930
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? =
931
- if (select.trySelect(idempotent)) (value ? : NULL_VALUE ) else null
945
+ override fun tryResumeReceive (value : E , idempotent : Any? ): Any? {
946
+ val result = select.trySelectIdempotent(idempotent)
947
+ return if (result == = SELECT_STARTED ) value ? : NULL_VALUE else result
948
+ }
932
949
933
950
@Suppress(" UNCHECKED_CAST" )
934
951
override fun completeResumeReceive (token : Any ) {
@@ -937,7 +954,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
937
954
}
938
955
939
956
override fun resumeReceiveClosed (closed : Closed <* >) {
940
- if (! select.trySelect(null )) return
957
+ if (! select.trySelect()) return
941
958
when (receiveMode) {
942
959
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
943
960
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed .closed<R >(closed.closeCause), select.completion)
@@ -984,10 +1001,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
984
1001
@SharedImmutable
985
1002
internal val ENQUEUE_FAILED : Any = Symbol (" ENQUEUE_FAILED" )
986
1003
987
- @JvmField
988
- @SharedImmutable
989
- internal val SELECT_STARTED : Any = Symbol (" SELECT_STARTED" )
990
-
991
1004
@JvmField
992
1005
@SharedImmutable
993
1006
internal val NULL_VALUE : Symbol = Symbol (" NULL_VALUE" )
@@ -1011,6 +1024,7 @@ internal typealias Handler = (Throwable?) -> Unit
1011
1024
*/
1012
1025
internal abstract class Send : LockFreeLinkedListNode () {
1013
1026
abstract val pollResult: Any? // E | Closed
1027
+ // Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeSend
1014
1028
abstract fun tryResumeSend (idempotent : Any? ): Any?
1015
1029
abstract fun completeResumeSend (token : Any )
1016
1030
abstract fun resumeSendClosed (closed : Closed <* >)
@@ -1021,6 +1035,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
1021
1035
*/
1022
1036
internal interface ReceiveOrClosed <in E > {
1023
1037
val offerResult: Any // OFFER_SUCCESS | Closed
1038
+ // Returns: null - failure, RETRY_ATOMIC for retry (only when idempotent != null), otherwise token for completeResumeReceive
1024
1039
fun tryResumeReceive (value : E , idempotent : Any? ): Any?
1025
1040
fun completeResumeReceive (token : Any )
1026
1041
}
0 commit comments