Skip to content

Commit d9d3574

Browse files
committed
Fix race in channel select/cancel
This bug was introduced by PR #1524. It was reproducing when there is a regular "send" operation on one side of a channel and "select { onReceive }" on another side of the channel and the "send" coroutine gets cancelled. The problem is that SendElement.tryResumeSend implementation was calling finishPrepare before it has successfully resumed continuation, so if that continuation was already cancelled, the code in "finishPrepare" had already stored the wrong affected node which it would later try to call "completeResume" on. This patch also adds hexAddress to the debug toString method of all internal node classes in channel implementation. Fixes #1588
1 parent cfc08ee commit d9d3574

File tree

2 files changed

+16
-12
lines changed

2 files changed

+16
-12
lines changed

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+15-11
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
466466
select.resumeSelectCancellableWithException(closed.sendException)
467467
}
468468

469-
override fun toString(): String = "SendSelect($pollResult)[$channel, $select]"
469+
override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
470470
}
471471

472472
internal class SendBuffered<out E>(
@@ -476,6 +476,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
476476
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
477477
override fun completeResumeSend() {}
478478
override fun resumeSendClosed(closed: Closed<*>) {}
479+
override fun toString(): String = "SendBuffered@$hexAddress($element)"
479480
}
480481
}
481482

@@ -899,9 +900,10 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
899900

900901
@Suppress("IMPLICIT_CAST_TO_ANY")
901902
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
902-
otherOp?.finishPrepare()
903903
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
904904
assert { token === RESUME_TOKEN } // the only other possible result
905+
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
906+
otherOp?.finishPrepare()
905907
return RESUME_TOKEN
906908
}
907909

@@ -914,17 +916,18 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
914916
else -> cont.resumeWithException(closed.receiveException)
915917
}
916918
}
917-
override fun toString(): String = "ReceiveElement[receiveMode=$receiveMode]"
919+
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
918920
}
919921

920922
private class ReceiveHasNext<E>(
921923
@JvmField val iterator: Itr<E>,
922924
@JvmField val cont: CancellableContinuation<Boolean>
923925
) : Receive<E>() {
924926
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
925-
otherOp?.finishPrepare()
926927
val token = cont.tryResume(true, otherOp?.desc) ?: return null
927928
assert { token === RESUME_TOKEN } // the only other possible result
929+
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
930+
otherOp?.finishPrepare()
928931
return RESUME_TOKEN
929932
}
930933

@@ -948,7 +951,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
948951
cont.completeResume(token)
949952
}
950953
}
951-
override fun toString(): String = "ReceiveHasNext"
954+
override fun toString(): String = "ReceiveHasNext@$hexAddress"
952955
}
953956

954957
private class ReceiveSelect<R, E>(
@@ -983,7 +986,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
983986
channel.onReceiveDequeued() // notify cancellation of receive
984987
}
985988

986-
override fun toString(): String = "ReceiveSelect[$select,receiveMode=$receiveMode]"
989+
override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
987990
}
988991
}
989992

@@ -1022,7 +1025,7 @@ internal abstract class Send : LockFreeLinkedListNode() {
10221025
// Returns: null - failure,
10231026
// RETRY_ATOMIC for retry (only when otherOp != null),
10241027
// RESUME_TOKEN on success (call completeResumeSend)
1025-
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1028+
// Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
10261029
abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
10271030
abstract fun completeResumeSend()
10281031
abstract fun resumeSendClosed(closed: Closed<*>)
@@ -1036,7 +1039,7 @@ internal interface ReceiveOrClosed<in E> {
10361039
// Returns: null - failure,
10371040
// RETRY_ATOMIC for retry (only when otherOp != null),
10381041
// RESUME_TOKEN on success (call completeResumeReceive)
1039-
// Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC
1042+
// Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
10401043
fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol?
10411044
fun completeResumeReceive(value: E)
10421045
}
@@ -1050,14 +1053,15 @@ internal class SendElement(
10501053
@JvmField val cont: CancellableContinuation<Unit>
10511054
) : Send() {
10521055
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1053-
otherOp?.finishPrepare()
10541056
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
10551057
assert { token === RESUME_TOKEN } // the only other possible result
1058+
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1059+
otherOp?.finishPrepare() // finish preparations
10561060
return RESUME_TOKEN
10571061
}
10581062
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
10591063
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1060-
override fun toString(): String = "SendElement($pollResult)"
1064+
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
10611065
}
10621066

10631067
/**
@@ -1076,7 +1080,7 @@ internal class Closed<in E>(
10761080
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
10771081
override fun completeResumeReceive(value: E) {}
10781082
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
1079-
override fun toString(): String = "Closed[$closeCause]"
1083+
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
10801084
}
10811085

10821086
private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {

kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ public actual open class LockFreeLinkedListNode {
399399

400400
// Returns REMOVE_PREPARED or null (it makes decision on any failure)
401401
override fun perform(affected: Any?): Any? {
402-
assert(affected === this.affected)
402+
assert { affected === this.affected }
403403
affected as Node // type assertion
404404
val decision = desc.onPrepare(this)
405405
if (decision === REMOVE_PREPARED) {

0 commit comments

Comments
 (0)