@@ -466,7 +466,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
466
466
select.resumeSelectCancellableWithException(closed.sendException)
467
467
}
468
468
469
- override fun toString (): String = " SendSelect($pollResult )[$channel , $select ]"
469
+ override fun toString (): String = " SendSelect@ $hexAddress ($pollResult )[$channel , $select ]"
470
470
}
471
471
472
472
internal class SendBuffered <out E >(
@@ -476,6 +476,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
476
476
override fun tryResumeSend (otherOp : PrepareOp ? ): Symbol ? = RESUME_TOKEN .also { otherOp?.finishPrepare() }
477
477
override fun completeResumeSend () {}
478
478
override fun resumeSendClosed (closed : Closed <* >) {}
479
+ override fun toString (): String = " SendBuffered@$hexAddress ($element )"
479
480
}
480
481
}
481
482
@@ -899,9 +900,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
899
900
900
901
@Suppress(" IMPLICIT_CAST_TO_ANY" )
901
902
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ? {
902
- otherOp?.finishPrepare()
903
903
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ? : return null
904
904
assert { token == = RESUME_TOKEN } // the only other possible result
905
+ // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
906
+ otherOp?.finishPrepare()
905
907
return RESUME_TOKEN
906
908
}
907
909
@@ -914,17 +916,18 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
914
916
else -> cont.resumeWithException(closed.receiveException)
915
917
}
916
918
}
917
- override fun toString (): String = " ReceiveElement[receiveMode=$receiveMode ]"
919
+ override fun toString (): String = " ReceiveElement@ $hexAddress [receiveMode=$receiveMode ]"
918
920
}
919
921
920
922
private class ReceiveHasNext <E >(
921
923
@JvmField val iterator : Itr <E >,
922
924
@JvmField val cont : CancellableContinuation <Boolean >
923
925
) : Receive<E>() {
924
926
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ? {
925
- otherOp?.finishPrepare()
926
927
val token = cont.tryResume(true , otherOp?.desc) ? : return null
927
928
assert { token == = RESUME_TOKEN } // the only other possible result
929
+ // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
930
+ otherOp?.finishPrepare()
928
931
return RESUME_TOKEN
929
932
}
930
933
@@ -948,7 +951,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
948
951
cont.completeResume(token)
949
952
}
950
953
}
951
- override fun toString (): String = " ReceiveHasNext"
954
+ override fun toString (): String = " ReceiveHasNext@ $hexAddress "
952
955
}
953
956
954
957
private class ReceiveSelect <R , E >(
@@ -983,7 +986,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
983
986
channel.onReceiveDequeued() // notify cancellation of receive
984
987
}
985
988
986
- override fun toString (): String = " ReceiveSelect[$select ,receiveMode=$receiveMode ]"
989
+ override fun toString (): String = " ReceiveSelect@ $hexAddress [$select ,receiveMode=$receiveMode ]"
987
990
}
988
991
}
989
992
@@ -1022,7 +1025,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
1022
1025
// Returns: null - failure,
1023
1026
// RETRY_ATOMIC for retry (only when otherOp != null),
1024
1027
// RESUME_TOKEN on success (call completeResumeSend)
1025
- // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1028
+ // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
1026
1029
abstract fun tryResumeSend (otherOp : PrepareOp ? ): Symbol ?
1027
1030
abstract fun completeResumeSend ()
1028
1031
abstract fun resumeSendClosed (closed : Closed <* >)
@@ -1036,7 +1039,7 @@ internal interface ReceiveOrClosed<in E> {
1036
1039
// Returns: null - failure,
1037
1040
// RETRY_ATOMIC for retry (only when otherOp != null),
1038
1041
// RESUME_TOKEN on success (call completeResumeReceive)
1039
- // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1042
+ // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
1040
1043
fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ?
1041
1044
fun completeResumeReceive (value : E )
1042
1045
}
@@ -1050,14 +1053,15 @@ internal class SendElement(
1050
1053
@JvmField val cont : CancellableContinuation <Unit >
1051
1054
) : Send() {
1052
1055
override fun tryResumeSend (otherOp : PrepareOp ? ): Symbol ? {
1053
- otherOp?.finishPrepare()
1054
1056
val token = cont.tryResume(Unit , otherOp?.desc) ? : return null
1055
1057
assert { token == = RESUME_TOKEN } // the only other possible result
1058
+ // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1059
+ otherOp?.finishPrepare() // finish preparations
1056
1060
return RESUME_TOKEN
1057
1061
}
1058
1062
override fun completeResumeSend () = cont.completeResume(RESUME_TOKEN )
1059
1063
override fun resumeSendClosed (closed : Closed <* >) = cont.resumeWithException(closed.sendException)
1060
- override fun toString (): String = " SendElement($pollResult )"
1064
+ override fun toString (): String = " SendElement@ $hexAddress ($pollResult )"
1061
1065
}
1062
1066
1063
1067
/* *
@@ -1076,7 +1080,7 @@ internal class Closed<in E>(
1076
1080
override fun tryResumeReceive (value : E , otherOp : PrepareOp ? ): Symbol ? = RESUME_TOKEN .also { otherOp?.finishPrepare() }
1077
1081
override fun completeResumeReceive (value : E ) {}
1078
1082
override fun resumeSendClosed (closed : Closed <* >) = assert { false } // "Should be never invoked"
1079
- override fun toString (): String = " Closed[$closeCause ]"
1083
+ override fun toString (): String = " Closed@ $hexAddress [$closeCause ]"
1080
1084
}
1081
1085
1082
1086
private abstract class Receive <in E > : LockFreeLinkedListNode (), ReceiveOrClosed<E> {
0 commit comments