@@ -48,7 +48,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
48
48
val receive = takeFirstReceiveOrPeekClosed() ? : return OFFER_FAILED
49
49
val token = receive.tryResumeReceive(element, null )
50
50
if (token != null ) {
51
- receive.completeResumeReceive(token)
51
+ assert { token == = RESUME_TOKEN }
52
+ receive.completeResumeReceive(element)
52
53
return receive.offerResult
53
54
}
54
55
}
@@ -65,7 +66,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
65
66
val failure = select.performAtomicTrySelect(offerOp)
66
67
if (failure != null ) return failure
67
68
val receive = offerOp.result
68
- receive.completeResumeReceive(offerOp.resumeToken !! )
69
+ receive.completeResumeReceive(element )
69
70
return receive.offerResult
70
71
}
71
72
@@ -354,8 +355,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
354
355
@JvmField val element : E ,
355
356
queue : LockFreeLinkedListHead
356
357
) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
357
- @JvmField var resumeToken: Any? = null
358
-
359
358
override fun failure (affected : LockFreeLinkedListNode ): Any? = when (affected) {
360
359
is Closed <* > -> affected
361
360
!is ReceiveOrClosed <* > -> OFFER_FAILED
@@ -367,7 +366,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
367
366
val affected = prepareOp.affected as ReceiveOrClosed <E > // see "failure" impl
368
367
val token = affected.tryResumeReceive(element, prepareOp) ? : return REMOVE_PREPARED
369
368
if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
370
- resumeToken = token
369
+ assert { token == = RESUME_TOKEN }
371
370
return null
372
371
}
373
372
}
@@ -454,8 +453,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
454
453
override fun tryResumeSend (otherOp : PrepareOp ? ): Any? =
455
454
select.trySelectOther(otherOp)
456
455
457
- override fun completeResumeSend (token : Any ) {
458
- assert { token == = SELECT_STARTED }
456
+ override fun completeResumeSend () {
459
457
block.startCoroutine(receiver = channel, completion = select.completion)
460
458
}
461
459
@@ -475,8 +473,8 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
475
473
@JvmField val element : E
476
474
) : Send() {
477
475
override val pollResult: Any? get() = element
478
- override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = SEND_RESUMED .also { otherOp?.finishPrepare() }
479
- override fun completeResumeSend (token : Any ) { assert { token == = SEND_RESUMED } }
476
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = RESUME_TOKEN .also { otherOp?.finishPrepare() }
477
+ override fun completeResumeSend () {}
480
478
override fun resumeSendClosed (closed : Closed <* >) {}
481
479
}
482
480
}
@@ -511,7 +509,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
511
509
val send = takeFirstSendOrPeekClosed() ? : return POLL_FAILED
512
510
val token = send.tryResumeSend(null )
513
511
if (token != null ) {
514
- send.completeResumeSend(token)
512
+ assert { token == = RESUME_TOKEN }
513
+ send.completeResumeSend()
515
514
return send.pollResult
516
515
}
517
516
}
@@ -528,8 +527,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
528
527
val failure = select.performAtomicTrySelect(pollOp)
529
528
if (failure != null ) return failure
530
529
val send = pollOp.result
531
- send.completeResumeSend(pollOp.resumeToken !! )
532
- return pollOp.pollResult
530
+ send.completeResumeSend()
531
+ return pollOp.result. pollResult
533
532
}
534
533
535
534
// ------ state functions & helpers for concrete implementations ------
@@ -673,9 +672,6 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
673
672
* @suppress **This is unstable API and it is subject to change.**
674
673
*/
675
674
protected class TryPollDesc <E >(queue : LockFreeLinkedListHead ) : RemoveFirstDesc<Send>(queue) {
676
- @JvmField var resumeToken: Any? = null
677
- @JvmField var pollResult: E ? = null
678
-
679
675
override fun failure (affected : LockFreeLinkedListNode ): Any? = when (affected) {
680
676
is Closed <* > -> affected
681
677
!is Send -> POLL_FAILED
@@ -687,8 +683,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
687
683
val affected = prepareOp.affected as Send // see "failure" impl
688
684
val token = affected.tryResumeSend(prepareOp) ? : return REMOVE_PREPARED
689
685
if (token == = RETRY_ATOMIC ) return RETRY_ATOMIC
690
- resumeToken = token
691
- pollResult = affected.pollResult as E
686
+ assert { token == = RESUME_TOKEN }
692
687
return null
693
688
}
694
689
}
@@ -908,7 +903,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
908
903
return cont.tryResume(resumeValue(value), otherOp?.desc)
909
904
}
910
905
911
- override fun completeResumeReceive (token : Any ) = cont.completeResume(token)
906
+ override fun completeResumeReceive (value : E ) = cont.completeResume(RESUME_TOKEN )
907
+
912
908
override fun resumeReceiveClosed (closed : Closed <* >) {
913
909
when {
914
910
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null )
@@ -925,25 +921,16 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
925
921
) : Receive<E>() {
926
922
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
927
923
otherOp?.finishPrepare()
928
- val token = cont.tryResume(true , otherOp?.desc)
929
- if (token != null ) {
930
- /*
931
- When otherOp != null this invocation can be stale and we cannot directly update iterator.result
932
- Instead, we save both token & result into a temporary IdempotentTokenValue object and
933
- set iterator result only in completeResumeReceive that is going to be invoked just once
934
- */
935
- if (otherOp != null ) return IdempotentTokenValue (token, value)
936
- iterator.result = value
937
- }
938
- return token
924
+ return cont.tryResume(true , otherOp?.desc)
939
925
}
940
926
941
- override fun completeResumeReceive (token : Any ) {
942
- if (token is IdempotentTokenValue <* >) {
943
- iterator.result = token.value
944
- cont.completeResume(token.token)
945
- } else
946
- cont.completeResume(token)
927
+ override fun completeResumeReceive (value : E ) {
928
+ /*
929
+ When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
930
+ but completeResumeReceive is called once so we set iterator result here.
931
+ */
932
+ iterator.result = value
933
+ cont.completeResume(RESUME_TOKEN )
947
934
}
948
935
949
936
override fun resumeReceiveClosed (closed : Closed <* >) {
@@ -966,14 +953,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
966
953
@JvmField val block : suspend (Any? ) -> R ,
967
954
@JvmField val receiveMode : Int
968
955
) : Receive<E>(), DisposableHandle {
969
- override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? {
970
- val result = select.trySelectOther(otherOp)
971
- return if (result == = SELECT_STARTED ) value ? : NULL_VALUE else result
972
- }
956
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? =
957
+ select.trySelectOther(otherOp)
973
958
974
959
@Suppress(" UNCHECKED_CAST" )
975
- override fun completeResumeReceive (token : Any ) {
976
- val value: E = NULL_VALUE .unbox<E >(token)
960
+ override fun completeResumeReceive (value : E ) {
977
961
block.startCoroutine(if (receiveMode == RECEIVE_RESULT ) ValueOrClosed .value(value) else value, select.completion)
978
962
}
979
963
@@ -997,11 +981,6 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
997
981
998
982
override fun toString (): String = " ReceiveSelect[$select ,receiveMode=$receiveMode ]"
999
983
}
1000
-
1001
- private class IdempotentTokenValue <out E >(
1002
- @JvmField val token : Any ,
1003
- @JvmField val value : E
1004
- )
1005
984
}
1006
985
1007
986
// receiveMode values
@@ -1025,18 +1004,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
1025
1004
@SharedImmutable
1026
1005
internal val ENQUEUE_FAILED : Any = Symbol (" ENQUEUE_FAILED" )
1027
1006
1028
- @JvmField
1029
- @SharedImmutable
1030
- internal val NULL_VALUE : Symbol = Symbol (" NULL_VALUE" )
1031
-
1032
- @JvmField
1033
- @SharedImmutable
1034
- internal val CLOSE_RESUMED : Any = Symbol (" CLOSE_RESUMED" )
1035
-
1036
- @JvmField
1037
- @SharedImmutable
1038
- internal val SEND_RESUMED : Any = Symbol (" SEND_RESUMED" )
1039
-
1040
1007
@JvmField
1041
1008
@SharedImmutable
1042
1009
internal val HANDLER_INVOKED : Any = Symbol (" ON_CLOSE_HANDLER_INVOKED" )
@@ -1050,10 +1017,10 @@ internal abstract class Send : LockFreeLinkedListNode() {
1050
1017
abstract val pollResult: Any? // E | Closed
1051
1018
// Returns: null - failure,
1052
1019
// RETRY_ATOMIC for retry (only when otherOp != null),
1053
- // otherwise token for completeResumeSend
1020
+ // RESUME_TOKEN on success (call completeResumeSend)
1054
1021
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1055
1022
abstract fun tryResumeSend (otherOp : PrepareOp ? ): Any?
1056
- abstract fun completeResumeSend (token : Any )
1023
+ abstract fun completeResumeSend ()
1057
1024
abstract fun resumeSendClosed (closed : Closed <* >)
1058
1025
}
1059
1026
@@ -1064,10 +1031,10 @@ internal interface ReceiveOrClosed<in E> {
1064
1031
val offerResult: Any // OFFER_SUCCESS | Closed
1065
1032
// Returns: null - failure,
1066
1033
// RETRY_ATOMIC for retry (only when otherOp != null),
1067
- // otherwise token for completeResumeReceive
1034
+ // RESUME_TOKEN on success (call completeResumeReceive)
1068
1035
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1069
1036
fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any?
1070
- fun completeResumeReceive (token : Any )
1037
+ fun completeResumeReceive (value : E )
1071
1038
}
1072
1039
1073
1040
/* *
@@ -1082,7 +1049,7 @@ internal class SendElement(
1082
1049
otherOp?.finishPrepare()
1083
1050
return cont.tryResume(Unit , otherOp?.desc)
1084
1051
}
1085
- override fun completeResumeSend (token : Any ) = cont.completeResume(token )
1052
+ override fun completeResumeSend () = cont.completeResume(RESUME_TOKEN )
1086
1053
override fun resumeSendClosed (closed : Closed <* >) = cont.resumeWithException(closed.sendException)
1087
1054
override fun toString (): String = " SendElement($pollResult )"
1088
1055
}
@@ -1098,10 +1065,10 @@ internal class Closed<in E>(
1098
1065
1099
1066
override val offerResult get() = this
1100
1067
override val pollResult get() = this
1101
- override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = CLOSE_RESUMED .also { otherOp?.finishPrepare() }
1102
- override fun completeResumeSend (token : Any ) { assert { token == = CLOSE_RESUMED } }
1103
- override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? = CLOSE_RESUMED .also { otherOp?.finishPrepare() }
1104
- override fun completeResumeReceive (token : Any ) { assert { token == = CLOSE_RESUMED } }
1068
+ override fun tryResumeSend (otherOp : PrepareOp ? ): Any? = RESUME_TOKEN .also { otherOp?.finishPrepare() }
1069
+ override fun completeResumeSend () {}
1070
+ override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Any? = RESUME_TOKEN .also { otherOp?.finishPrepare() }
1071
+ override fun completeResumeReceive (value : E ) {}
1105
1072
override fun resumeSendClosed (closed : Closed <* >) = assert { false } // "Should be never invoked"
1106
1073
override fun toString (): String = " Closed[$closeCause ]"
1107
1074
}
0 commit comments