@@ -46,7 +46,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
46
46
protected open fun offerInternal (element : E ): Any {
47
47
while (true ) {
48
48
val receive = takeFirstReceiveOrPeekClosed() ? : return OFFER_FAILED
49
- val token = receive.tryResumeReceive(element, idempotent = null )
49
+ val token = receive.tryResumeReceive(element, null )
50
50
if (token != null ) {
51
51
receive.completeResumeReceive(token)
52
52
return receive.offerResult
@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
56
56
57
57
/* *
58
58
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59
- * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59
+ * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
60
60
* @suppress **This is unstable API and it is subject to change.**
61
61
*/
62
62
protected open fun offerSelectInternal (element : E , select : SelectInstance <* >): Any {
@@ -352,10 +352,13 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
352
352
else -> null
353
353
}
354
354
355
- override fun validatePrepared (node : ReceiveOrClosed <E >): Boolean {
356
- val token = node.tryResumeReceive(element, idempotent = this ) ? : return false
355
+ @Suppress(" UNCHECKED_CAST" )
356
+ override fun onPrepare (prepareOp : PrepareOp ): Any? {
357
+ val affected = prepareOp.affected as ReceiveOrClosed <E > // see "failure" impl
358
+ val token = affected.tryResumeReceive(element, prepareOp) ? : return REMOVE_PREPARED
359
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
357
360
resumeToken = token
358
- return true
361
+ return null
359
362
}
360
363
}
361
364
@@ -388,6 +391,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
388
391
when {
389
392
offerResult == = ALREADY_SELECTED -> return
390
393
offerResult == = OFFER_FAILED -> {} // retry
394
+ offerResult == = RETRY_ATOMIC -> {} // retry
391
395
offerResult == = OFFER_SUCCESS -> {
392
396
block.startCoroutineUnintercepted(receiver = this , completion = select.completion)
393
397
return
@@ -437,8 +441,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
437
441
@JvmField val select : SelectInstance <R >,
438
442
@JvmField val block : suspend (SendChannel <E >) -> R
439
443
) : Send(), DisposableHandle {
440
- override fun tryResumeSend (idempotent : Any ? ): Any? =
441
- if ( select.trySelect(idempotent)) SELECT_STARTED else null
444
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? =
445
+ select.trySelectOther(otherOp)
442
446
443
447
override fun completeResumeSend (token : Any ) {
444
448
assert { token == = SELECT_STARTED }
@@ -450,7 +454,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
450
454
}
451
455
452
456
override fun resumeSendClosed (closed : Closed <* >) {
453
- if (select.trySelect(null ))
457
+ if (select.trySelect())
454
458
select.resumeSelectCancellableWithException(closed.sendException)
455
459
}
456
460
@@ -461,7 +465,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
461
465
@JvmField val element : E
462
466
) : Send() {
463
467
override val pollResult: Any? get() = element
464
- override fun tryResumeSend (idempotent : Any ? ): Any? = SEND_RESUMED
468
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = SEND_RESUMED . also { otherOp?.finishPrepare() }
465
469
override fun completeResumeSend (token : Any ) { assert { token == = SEND_RESUMED } }
466
470
override fun resumeSendClosed (closed : Closed <* >) {}
467
471
}
@@ -495,7 +499,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
495
499
protected open fun pollInternal (): Any? {
496
500
while (true ) {
497
501
val send = takeFirstSendOrPeekClosed() ? : return POLL_FAILED
498
- val token = send.tryResumeSend(idempotent = null )
502
+ val token = send.tryResumeSend(null )
499
503
if (token != null ) {
500
504
send.completeResumeSend(token)
501
505
return send.pollResult
@@ -505,7 +509,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
505
509
506
510
/* *
507
511
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
508
- * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
512
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
509
513
* @suppress **This is unstable API and it is subject to change.**
510
514
*/
511
515
protected open fun pollSelectInternal (select : SelectInstance <* >): Any? {
@@ -655,11 +659,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
655
659
}
656
660
657
661
@Suppress(" UNCHECKED_CAST" )
658
- override fun validatePrepared (node : Send ): Boolean {
659
- val token = node.tryResumeSend(idempotent = this ) ? : return false
662
+ override fun onPrepare (prepareOp : PrepareOp ): Any? {
663
+ val affected = prepareOp.affected as Send // see "failure" impl
664
+ val token = affected.tryResumeSend(prepareOp) ? : return REMOVE_PREPARED
665
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
660
666
resumeToken = token
661
- pollResult = node .pollResult as E
662
- return true
667
+ pollResult = affected .pollResult as E
668
+ return null
663
669
}
664
670
}
665
671
@@ -681,6 +687,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
681
687
when {
682
688
pollResult == = ALREADY_SELECTED -> return
683
689
pollResult == = POLL_FAILED -> {} // retry
690
+ pollResult == = RETRY_ATOMIC -> {} // retry
684
691
pollResult is Closed <* > -> throw recoverStackTrace(pollResult.receiveException)
685
692
else -> {
686
693
block.startCoroutineUnintercepted(pollResult as E , select.completion)
@@ -709,9 +716,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
709
716
when {
710
717
pollResult == = ALREADY_SELECTED -> return
711
718
pollResult == = POLL_FAILED -> {} // retry
719
+ pollResult == = RETRY_ATOMIC -> {} // retry
712
720
pollResult is Closed <* > -> {
713
721
if (pollResult.closeCause == null ) {
714
- if (select.trySelect(null ))
722
+ if (select.trySelect())
715
723
block.startCoroutineUnintercepted(null , select.completion)
716
724
return
717
725
} else {
@@ -746,6 +754,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
746
754
when {
747
755
pollResult == = ALREADY_SELECTED -> return
748
756
pollResult == = POLL_FAILED -> {} // retry
757
+ pollResult == = RETRY_ATOMIC -> {} // retry
749
758
pollResult is Closed <* > -> {
750
759
block.startCoroutineUnintercepted(ValueOrClosed .closed(pollResult.closeCause), select.completion)
751
760
}
@@ -870,7 +879,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
870
879
}
871
880
872
881
@Suppress(" IMPLICIT_CAST_TO_ANY" )
873
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? = cont.tryResume(resumeValue(value), idempotent)
882
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
883
+ otherOp?.finishPrepare()
884
+ return cont.tryResume(resumeValue(value), otherOp?.desc)
885
+ }
886
+
874
887
override fun completeResumeReceive (token : Any ) = cont.completeResume(token)
875
888
override fun resumeReceiveClosed (closed : Closed <* >) {
876
889
when {
@@ -886,15 +899,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
886
899
@JvmField val iterator : Itr <E >,
887
900
@JvmField val cont : CancellableContinuation <Boolean >
888
901
) : Receive<E>() {
889
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? {
890
- val token = cont.tryResume(true , idempotent)
902
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
903
+ otherOp?.finishPrepare()
904
+ val token = cont.tryResume(true , otherOp?.desc)
891
905
if (token != null ) {
892
906
/*
893
- When idempotent != null this invocation can be stale and we cannot directly update iterator.result
907
+ When otherOp != null this invocation can be stale and we cannot directly update iterator.result
894
908
Instead, we save both token & result into a temporary IdempotentTokenValue object and
895
909
set iterator result only in completeResumeReceive that is going to be invoked just once
896
910
*/
897
- if (idempotent != null ) return IdempotentTokenValue (token, value)
911
+ if (otherOp != null ) return IdempotentTokenValue (token, value)
898
912
iterator.result = value
899
913
}
900
914
return token
@@ -928,8 +942,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
928
942
@JvmField val block : suspend (Any? ) -> R ,
929
943
@JvmField val receiveMode : Int
930
944
) : Receive<E>(), DisposableHandle {
931
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? =
932
- if (select.trySelect(idempotent)) (value ? : NULL_VALUE ) else null
945
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
946
+ val result = select.trySelectOther(otherOp)
947
+ return if (result == = SELECT_STARTED ) value ? : NULL_VALUE else result
948
+ }
933
949
934
950
@Suppress(" UNCHECKED_CAST" )
935
951
override fun completeResumeReceive (token : Any ) {
@@ -938,7 +954,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
938
954
}
939
955
940
956
override fun resumeReceiveClosed (closed : Closed <* >) {
941
- if (! select.trySelect(null )) return
957
+ if (! select.trySelect()) return
942
958
when (receiveMode) {
943
959
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
944
960
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed .closed<R >(closed.closeCause), select.completion)
@@ -985,10 +1001,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
985
1001
@SharedImmutable
986
1002
internal val ENQUEUE_FAILED : Any = Symbol (" ENQUEUE_FAILED" )
987
1003
988
- @JvmField
989
- @SharedImmutable
990
- internal val SELECT_STARTED : Any = Symbol (" SELECT_STARTED" )
991
-
992
1004
@JvmField
993
1005
@SharedImmutable
994
1006
internal val NULL_VALUE : Symbol = Symbol (" NULL_VALUE" )
@@ -1012,7 +1024,11 @@ internal typealias Handler = (Throwable?) -> Unit
1012
1024
*/
1013
1025
internal abstract class Send : LockFreeLinkedListNode () {
1014
1026
abstract val pollResult: Any? // E | Closed
1015
- abstract fun tryResumeSend (idempotent : Any? ): Any?
1027
+ // Returns: null - failure,
1028
+ // RETRY_ATOMIC for retry (only when otherOp != null),
1029
+ // otherwise token for completeResumeSend
1030
+ // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1031
+ abstract fun tryResumeSend (otherOp : PrepareOp ? ): Any?
1016
1032
abstract fun completeResumeSend (token : Any )
1017
1033
abstract fun resumeSendClosed (closed : Closed <* >)
1018
1034
}
@@ -1022,7 +1038,11 @@ internal abstract class Send : LockFreeLinkedListNode() {
1022
1038
*/
1023
1039
internal interface ReceiveOrClosed <in E > {
1024
1040
val offerResult: Any // OFFER_SUCCESS | Closed
1025
- fun tryResumeReceive (value : E , idempotent : Any? ): Any?
1041
+ // Returns: null - failure,
1042
+ // RETRY_ATOMIC for retry (only when otherOp != null),
1043
+ // otherwise token for completeResumeReceive
1044
+ // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1045
+ fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any?
1026
1046
fun completeResumeReceive (token : Any )
1027
1047
}
1028
1048
@@ -1034,7 +1054,10 @@ internal class SendElement(
1034
1054
override val pollResult : Any? ,
1035
1055
@JvmField val cont : CancellableContinuation <Unit >
1036
1056
) : Send() {
1037
- override fun tryResumeSend (idempotent : Any? ): Any? = cont.tryResume(Unit , idempotent)
1057
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? {
1058
+ otherOp?.finishPrepare()
1059
+ return cont.tryResume(Unit , otherOp?.desc)
1060
+ }
1038
1061
override fun completeResumeSend (token : Any ) = cont.completeResume(token)
1039
1062
override fun resumeSendClosed (closed : Closed <* >) = cont.resumeWithException(closed.sendException)
1040
1063
override fun toString (): String = " SendElement($pollResult )"
@@ -1051,9 +1074,9 @@ internal class Closed<in E>(
1051
1074
1052
1075
override val offerResult get() = this
1053
1076
override val pollResult get() = this
1054
- override fun tryResumeSend (idempotent : Any ? ): Any? = CLOSE_RESUMED
1077
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = CLOSE_RESUMED . also { otherOp?.finishPrepare() }
1055
1078
override fun completeResumeSend (token : Any ) { assert { token == = CLOSE_RESUMED } }
1056
- override fun tryResumeReceive (value : E , idempotent : Any ? ): Any? = CLOSE_RESUMED
1079
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? = CLOSE_RESUMED . also { otherOp?.finishPrepare() }
1057
1080
override fun completeResumeReceive (token : Any ) { assert { token == = CLOSE_RESUMED } }
1058
1081
override fun resumeSendClosed (closed : Closed <* >) = assert { false } // "Should be never invoked"
1059
1082
override fun toString (): String = " Closed[$closeCause ]"
0 commit comments