@@ -46,7 +46,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
46
46
protected open fun offerInternal (element : E ): Any {
47
47
while (true ) {
48
48
val receive = takeFirstReceiveOrPeekClosed() ? : return OFFER_FAILED
49
- val token = receive.tryResumeReceive(element, idempotent = null )
49
+ val token = receive.tryResumeReceive(element, null )
50
50
if (token != null ) {
51
51
receive.completeResumeReceive(token)
52
52
return receive.offerResult
@@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
56
56
57
57
/* *
58
58
* Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
59
- * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`.
59
+ * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
60
60
* @suppress **This is unstable API and it is subject to change.**
61
61
*/
62
62
protected open fun offerSelectInternal (element : E , select : SelectInstance <* >): Any {
@@ -362,10 +362,13 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
362
362
else -> null
363
363
}
364
364
365
- override fun validatePrepared (node : ReceiveOrClosed <E >): Boolean {
366
- val token = node.tryResumeReceive(element, idempotent = this ) ? : return false
365
+ @Suppress(" UNCHECKED_CAST" )
366
+ override fun onPrepare (prepareOp : PrepareOp ): Any? {
367
+ val affected = prepareOp.affected as ReceiveOrClosed <E > // see "failure" impl
368
+ val token = affected.tryResumeReceive(element, prepareOp) ? : return REMOVE_PREPARED
369
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
367
370
resumeToken = token
368
- return true
371
+ return null
369
372
}
370
373
}
371
374
@@ -398,6 +401,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
398
401
when {
399
402
offerResult == = ALREADY_SELECTED -> return
400
403
offerResult == = OFFER_FAILED -> {} // retry
404
+ offerResult == = RETRY_ATOMIC -> {} // retry
401
405
offerResult == = OFFER_SUCCESS -> {
402
406
block.startCoroutineUnintercepted(receiver = this , completion = select.completion)
403
407
return
@@ -447,8 +451,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
447
451
@JvmField val select : SelectInstance <R >,
448
452
@JvmField val block : suspend (SendChannel <E >) -> R
449
453
) : Send(), DisposableHandle {
450
- override fun tryResumeSend (idempotent : Any ? ): Any? =
451
- if ( select.trySelect(idempotent)) SELECT_STARTED else null
454
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? =
455
+ select.trySelectOther(otherOp)
452
456
453
457
override fun completeResumeSend (token : Any ) {
454
458
assert { token == = SELECT_STARTED }
@@ -460,7 +464,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
460
464
}
461
465
462
466
override fun resumeSendClosed (closed : Closed <* >) {
463
- if (select.trySelect(null ))
467
+ if (select.trySelect())
464
468
select.resumeSelectCancellableWithException(closed.sendException)
465
469
}
466
470
@@ -471,7 +475,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
471
475
@JvmField val element : E
472
476
) : Send() {
473
477
override val pollResult: Any? get() = element
474
- override fun tryResumeSend (idempotent : Any ? ): Any? = SEND_RESUMED
478
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = SEND_RESUMED . also { otherOp?.finishPrepare() }
475
479
override fun completeResumeSend (token : Any ) { assert { token == = SEND_RESUMED } }
476
480
override fun resumeSendClosed (closed : Closed <* >) {}
477
481
}
@@ -505,7 +509,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
505
509
protected open fun pollInternal (): Any? {
506
510
while (true ) {
507
511
val send = takeFirstSendOrPeekClosed() ? : return POLL_FAILED
508
- val token = send.tryResumeSend(idempotent = null )
512
+ val token = send.tryResumeSend(null )
509
513
if (token != null ) {
510
514
send.completeResumeSend(token)
511
515
return send.pollResult
@@ -515,7 +519,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
515
519
516
520
/* *
517
521
* Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
518
- * Return type is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
522
+ * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
519
523
* @suppress **This is unstable API and it is subject to change.**
520
524
*/
521
525
protected open fun pollSelectInternal (select : SelectInstance <* >): Any? {
@@ -665,11 +669,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
665
669
}
666
670
667
671
@Suppress(" UNCHECKED_CAST" )
668
- override fun validatePrepared (node : Send ): Boolean {
669
- val token = node.tryResumeSend(idempotent = this ) ? : return false
672
+ override fun onPrepare (prepareOp : PrepareOp ): Any? {
673
+ val affected = prepareOp.affected as Send // see "failure" impl
674
+ val token = affected.tryResumeSend(prepareOp) ? : return REMOVE_PREPARED
675
+ if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
670
676
resumeToken = token
671
- pollResult = node .pollResult as E
672
- return true
677
+ pollResult = affected .pollResult as E
678
+ return null
673
679
}
674
680
}
675
681
@@ -691,6 +697,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
691
697
when {
692
698
pollResult == = ALREADY_SELECTED -> return
693
699
pollResult == = POLL_FAILED -> {} // retry
700
+ pollResult == = RETRY_ATOMIC -> {} // retry
694
701
pollResult is Closed <* > -> throw recoverStackTrace(pollResult.receiveException)
695
702
else -> {
696
703
block.startCoroutineUnintercepted(pollResult as E , select.completion)
@@ -719,9 +726,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
719
726
when {
720
727
pollResult == = ALREADY_SELECTED -> return
721
728
pollResult == = POLL_FAILED -> {} // retry
729
+ pollResult == = RETRY_ATOMIC -> {} // retry
722
730
pollResult is Closed <* > -> {
723
731
if (pollResult.closeCause == null ) {
724
- if (select.trySelect(null ))
732
+ if (select.trySelect())
725
733
block.startCoroutineUnintercepted(null , select.completion)
726
734
return
727
735
} else {
@@ -756,6 +764,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
756
764
when {
757
765
pollResult == = ALREADY_SELECTED -> return
758
766
pollResult == = POLL_FAILED -> {} // retry
767
+ pollResult == = RETRY_ATOMIC -> {} // retry
759
768
pollResult is Closed <* > -> {
760
769
block.startCoroutineUnintercepted(ValueOrClosed .closed(pollResult.closeCause), select.completion)
761
770
}
@@ -880,7 +889,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
880
889
}
881
890
882
891
@Suppress(" IMPLICIT_CAST_TO_ANY" )
883
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? = cont.tryResume(resumeValue(value), idempotent)
892
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
893
+ otherOp?.finishPrepare()
894
+ return cont.tryResume(resumeValue(value), otherOp?.desc)
895
+ }
896
+
884
897
override fun completeResumeReceive (token : Any ) = cont.completeResume(token)
885
898
override fun resumeReceiveClosed (closed : Closed <* >) {
886
899
when {
@@ -896,15 +909,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
896
909
@JvmField val iterator : Itr <E >,
897
910
@JvmField val cont : CancellableContinuation <Boolean >
898
911
) : Receive<E>() {
899
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? {
900
- val token = cont.tryResume(true , idempotent)
912
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
913
+ otherOp?.finishPrepare()
914
+ val token = cont.tryResume(true , otherOp?.desc)
901
915
if (token != null ) {
902
916
/*
903
- When idempotent != null this invocation can be stale and we cannot directly update iterator.result
917
+ When otherOp != null this invocation can be stale and we cannot directly update iterator.result
904
918
Instead, we save both token & result into a temporary IdempotentTokenValue object and
905
919
set iterator result only in completeResumeReceive that is going to be invoked just once
906
920
*/
907
- if (idempotent != null ) return IdempotentTokenValue (token, value)
921
+ if (otherOp != null ) return IdempotentTokenValue (token, value)
908
922
iterator.result = value
909
923
}
910
924
return token
@@ -938,8 +952,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
938
952
@JvmField val block : suspend (Any? ) -> R ,
939
953
@JvmField val receiveMode : Int
940
954
) : Receive<E>(), DisposableHandle {
941
- override fun tryResumeReceive (value : E , idempotent : Any? ): Any? =
942
- if (select.trySelect(idempotent)) (value ? : NULL_VALUE ) else null
955
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
956
+ val result = select.trySelectOther(otherOp)
957
+ return if (result == = SELECT_STARTED ) value ? : NULL_VALUE else result
958
+ }
943
959
944
960
@Suppress(" UNCHECKED_CAST" )
945
961
override fun completeResumeReceive (token : Any ) {
@@ -948,7 +964,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
948
964
}
949
965
950
966
override fun resumeReceiveClosed (closed : Closed <* >) {
951
- if (! select.trySelect(null )) return
967
+ if (! select.trySelect()) return
952
968
when (receiveMode) {
953
969
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
954
970
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed .closed<R >(closed.closeCause), select.completion)
@@ -995,10 +1011,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
995
1011
@SharedImmutable
996
1012
internal val ENQUEUE_FAILED : Any = Symbol (" ENQUEUE_FAILED" )
997
1013
998
- @JvmField
999
- @SharedImmutable
1000
- internal val SELECT_STARTED : Any = Symbol (" SELECT_STARTED" )
1001
-
1002
1014
@JvmField
1003
1015
@SharedImmutable
1004
1016
internal val NULL_VALUE : Symbol = Symbol (" NULL_VALUE" )
@@ -1022,7 +1034,11 @@ internal typealias Handler = (Throwable?) -> Unit
1022
1034
*/
1023
1035
internal abstract class Send : LockFreeLinkedListNode () {
1024
1036
abstract val pollResult: Any? // E | Closed
1025
- abstract fun tryResumeSend (idempotent : Any? ): Any?
1037
+ // Returns: null - failure,
1038
+ // RETRY_ATOMIC for retry (only when otherOp != null),
1039
+ // otherwise token for completeResumeSend
1040
+ // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1041
+ abstract fun tryResumeSend (otherOp : PrepareOp ? ): Any?
1026
1042
abstract fun completeResumeSend (token : Any )
1027
1043
abstract fun resumeSendClosed (closed : Closed <* >)
1028
1044
}
@@ -1032,7 +1048,11 @@ internal abstract class Send : LockFreeLinkedListNode() {
1032
1048
*/
1033
1049
internal interface ReceiveOrClosed <in E > {
1034
1050
val offerResult: Any // OFFER_SUCCESS | Closed
1035
- fun tryResumeReceive (value : E , idempotent : Any? ): Any?
1051
+ // Returns: null - failure,
1052
+ // RETRY_ATOMIC for retry (only when otherOp != null),
1053
+ // otherwise token for completeResumeReceive
1054
+ // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1055
+ fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any?
1036
1056
fun completeResumeReceive (token : Any )
1037
1057
}
1038
1058
@@ -1044,7 +1064,10 @@ internal class SendElement(
1044
1064
override val pollResult : Any? ,
1045
1065
@JvmField val cont : CancellableContinuation <Unit >
1046
1066
) : Send() {
1047
- override fun tryResumeSend (idempotent : Any? ): Any? = cont.tryResume(Unit , idempotent)
1067
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? {
1068
+ otherOp?.finishPrepare()
1069
+ return cont.tryResume(Unit , otherOp?.desc)
1070
+ }
1048
1071
override fun completeResumeSend (token : Any ) = cont.completeResume(token)
1049
1072
override fun resumeSendClosed (closed : Closed <* >) = cont.resumeWithException(closed.sendException)
1050
1073
override fun toString (): String = " SendElement($pollResult )"
@@ -1061,9 +1084,9 @@ internal class Closed<in E>(
1061
1084
1062
1085
override val offerResult get() = this
1063
1086
override val pollResult get() = this
1064
- override fun tryResumeSend (idempotent : Any ? ): Any? = CLOSE_RESUMED
1087
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = CLOSE_RESUMED . also { otherOp?.finishPrepare() }
1065
1088
override fun completeResumeSend (token : Any ) { assert { token == = CLOSE_RESUMED } }
1066
- override fun tryResumeReceive (value : E , idempotent : Any ? ): Any? = CLOSE_RESUMED
1089
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? = CLOSE_RESUMED . also { otherOp?.finishPrepare() }
1067
1090
override fun completeResumeReceive (token : Any ) { assert { token == = CLOSE_RESUMED } }
1068
1091
override fun resumeSendClosed (closed : Closed <* >) = assert { false } // "Should be never invoked"
1069
1092
override fun toString (): String = " Closed[$closeCause ]"
0 commit comments