From ddd2d9cb8a666f9dbd434abb93b87cb11616ad18 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Sun, 22 Sep 2019 21:13:12 +0300 Subject: [PATCH 1/2] Fix race condition in pair select This bug was introduced by #1524. The crux of problem is that TryOffer/PollDesc.onPrepare method is no longer allowed to update fields in these classes (like "resumeToken" and "pollResult") after call to tryResumeSend/Receive method, because the latter will complete the ongoing atomic operation and helper method might find it complete and try reading "resumeToken" which was not initialized yet. This change removes "pollResult" field which was not really needed ("result.pollResult" field is used) and removes "resumeToken" by exploiting the fact that current implementation of CancellableContinuationImpl does not need a token anymore. However, CancellableContinuation.tryResume/completeResume ABI is left intact, because it is used by 3rd party code. This fix lead to overall simplification of the code. A number of fields and an auxiliary IdempotentTokenValue class are removed, tokens used to indicate various results are consolidated, so that resume success is now consistently indicated by a single RESUME_TOKEN symbol. Fixes #1561 --- .../common/src/CancellableContinuationImpl.kt | 19 ++-- .../common/src/channels/AbstractChannel.kt | 101 ++++++------------ .../src/channels/ArrayBroadcastChannel.kt | 16 +-- .../common/src/channels/ArrayChannel.kt | 32 +++--- .../common/src/selects/Select.kt | 14 +-- 5 files changed, 74 insertions(+), 108 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 9a19ed61e3..559afb8289 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -14,6 +14,10 @@ private const val UNDECIDED = 0 private const val SUSPENDED = 1 private const val RESUMED = 2 +@JvmField +@SharedImmutable +internal val RESUME_TOKEN = Symbol("RESUME_TOKEN") + /** * @suppress **This is unstable API and it is subject to change.** */ @@ -347,20 +351,21 @@ internal open class CancellableContinuationImpl( parentHandle = NonDisposableHandle } + // Note: Always returns RESUME_TOKEN | null override fun tryResume(value: T, idempotent: Any?): Any? { _state.loop { state -> when (state) { is NotCompleted -> { val update: Any? = if (idempotent == null) value else - CompletedIdempotentResult(idempotent, value, state) + CompletedIdempotentResult(idempotent, value) if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure detachChildIfNonResuable() - return state + return RESUME_TOKEN } is CompletedIdempotentResult -> { return if (state.idempotentResume === idempotent) { assert { state.result === value } // "Non-idempotent resume" - state.token + RESUME_TOKEN } else { null } @@ -377,15 +382,16 @@ internal open class CancellableContinuationImpl( val update = CompletedExceptionally(exception) if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure detachChildIfNonResuable() - return state + return RESUME_TOKEN } else -> return null // cannot resume -- not active anymore } } } + // note: token is always RESUME_TOKEN override fun completeResume(token: Any) { - // note: We don't actually use token anymore, because handler needs to be invoked on cancellation only + assert { token === RESUME_TOKEN } dispatchResume(resumeMode) } @@ -437,8 +443,7 @@ private class InvokeOnCancel( // Clashes with InvokeOnCancellation private class CompletedIdempotentResult( @JvmField val idempotentResume: Any?, - @JvmField val result: Any?, - @JvmField val token: NotCompleted + @JvmField val result: Any? ) { override fun toString(): String = "CompletedIdempotentResult[$result]" } diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 22c1971c0a..9f672af10b 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -48,7 +48,8 @@ internal abstract class AbstractSendChannel : SendChannel { val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED val token = receive.tryResumeReceive(element, null) if (token != null) { - receive.completeResumeReceive(token) + assert { token === RESUME_TOKEN } + receive.completeResumeReceive(element) return receive.offerResult } } @@ -65,7 +66,7 @@ internal abstract class AbstractSendChannel : SendChannel { val failure = select.performAtomicTrySelect(offerOp) if (failure != null) return failure val receive = offerOp.result - receive.completeResumeReceive(offerOp.resumeToken!!) + receive.completeResumeReceive(element) return receive.offerResult } @@ -354,8 +355,6 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val element: E, queue: LockFreeLinkedListHead ) : RemoveFirstDesc>(queue) { - @JvmField var resumeToken: Any? = null - override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { is Closed<*> -> affected !is ReceiveOrClosed<*> -> OFFER_FAILED @@ -367,7 +366,7 @@ internal abstract class AbstractSendChannel : SendChannel { val affected = prepareOp.affected as ReceiveOrClosed // see "failure" impl val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED if (token === RETRY_ATOMIC) return RETRY_ATOMIC - resumeToken = token + assert { token === RESUME_TOKEN } return null } } @@ -454,8 +453,7 @@ internal abstract class AbstractSendChannel : SendChannel { override fun tryResumeSend(otherOp: PrepareOp?): Any? = select.trySelectOther(otherOp) - override fun completeResumeSend(token: Any) { - assert { token === SELECT_STARTED } + override fun completeResumeSend() { block.startCoroutine(receiver = channel, completion = select.completion) } @@ -475,8 +473,8 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val element: E ) : Send() { override val pollResult: Any? get() = element - override fun tryResumeSend(otherOp: PrepareOp?): Any? = SEND_RESUMED.also { otherOp?.finishPrepare() } - override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } } + override fun tryResumeSend(otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun completeResumeSend() {} override fun resumeSendClosed(closed: Closed<*>) {} } } @@ -511,7 +509,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel(queue: LockFreeLinkedListHead) : RemoveFirstDesc(queue) { - @JvmField var resumeToken: Any? = null - @JvmField var pollResult: E? = null - override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { is Closed<*> -> affected !is Send -> POLL_FAILED @@ -687,8 +683,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel) { when { receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) @@ -925,25 +921,16 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel() { override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { otherOp?.finishPrepare() - val token = cont.tryResume(true, otherOp?.desc) - if (token != null) { - /* - When otherOp != null this invocation can be stale and we cannot directly update iterator.result - Instead, we save both token & result into a temporary IdempotentTokenValue object and - set iterator result only in completeResumeReceive that is going to be invoked just once - */ - if (otherOp != null) return IdempotentTokenValue(token, value) - iterator.result = value - } - return token + return cont.tryResume(true, otherOp?.desc) } - override fun completeResumeReceive(token: Any) { - if (token is IdempotentTokenValue<*>) { - iterator.result = token.value - cont.completeResume(token.token) - } else - cont.completeResume(token) + override fun completeResumeReceive(value: E) { + /* + When otherOp != null invocation of tryResumeReceive can happen multiple times and much later, + but completeResumeReceive is called once so we set iterator result here. + */ + iterator.result = value + cont.completeResume(RESUME_TOKEN) } override fun resumeReceiveClosed(closed: Closed<*>) { @@ -966,14 +953,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, @JvmField val receiveMode: Int ) : Receive(), DisposableHandle { - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { - val result = select.trySelectOther(otherOp) - return if (result === SELECT_STARTED) value ?: NULL_VALUE else result - } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = + select.trySelectOther(otherOp) @Suppress("UNCHECKED_CAST") - override fun completeResumeReceive(token: Any) { - val value: E = NULL_VALUE.unbox(token) + override fun completeResumeReceive(value: E) { block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion) } @@ -997,11 +981,6 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( - @JvmField val token: Any, - @JvmField val value: E - ) } // receiveMode values @@ -1025,18 +1004,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED") @SharedImmutable internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED") -@JvmField -@SharedImmutable -internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE") - -@JvmField -@SharedImmutable -internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED") - -@JvmField -@SharedImmutable -internal val SEND_RESUMED: Any = Symbol("SEND_RESUMED") - @JvmField @SharedImmutable internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED") @@ -1050,10 +1017,10 @@ internal abstract class Send : LockFreeLinkedListNode() { abstract val pollResult: Any? // E | Closed // Returns: null - failure, // RETRY_ATOMIC for retry (only when otherOp != null), - // otherwise token for completeResumeSend + // RESUME_TOKEN on success (call completeResumeSend) // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC abstract fun tryResumeSend(otherOp: PrepareOp?): Any? - abstract fun completeResumeSend(token: Any) + abstract fun completeResumeSend() abstract fun resumeSendClosed(closed: Closed<*>) } @@ -1064,10 +1031,10 @@ internal interface ReceiveOrClosed { val offerResult: Any // OFFER_SUCCESS | Closed // Returns: null - failure, // RETRY_ATOMIC for retry (only when otherOp != null), - // otherwise token for completeResumeReceive + // RESUME_TOKEN on success (call completeResumeReceive) // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? - fun completeResumeReceive(token: Any) + fun completeResumeReceive(value: E) } /** @@ -1082,7 +1049,7 @@ internal class SendElement( otherOp?.finishPrepare() return cont.tryResume(Unit, otherOp?.desc) } - override fun completeResumeSend(token: Any) = cont.completeResume(token) + override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN) override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException) override fun toString(): String = "SendElement($pollResult)" } @@ -1098,10 +1065,10 @@ internal class Closed( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() } - override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } } - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() } - override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } } + override fun tryResumeSend(otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun completeResumeSend() {} + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed[$closeCause]" } diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt index c9fb3cc74d..5f3eb32d8f 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt @@ -143,7 +143,6 @@ internal class ArrayBroadcastChannel( private tailrec fun updateHead(addSub: Subscriber? = null, removeSub: Subscriber? = null) { // update head in a tail rec loop var send: Send? = null - var token: Any? = null bufferLock.withLock { if (addSub != null) { addSub.subHead = tail // start from last element @@ -172,8 +171,9 @@ internal class ArrayBroadcastChannel( while (true) { send = takeFirstSendOrPeekClosed() ?: break // when when no sender if (send is Closed<*>) break // break when closed for send - token = send!!.tryResumeSend(null) + val token = send!!.tryResumeSend(null) if (token != null) { + assert { token === RESUME_TOKEN } // put sent element to the buffer buffer[(tail % capacity).toInt()] = (send as Send).pollResult this.size = size + 1 @@ -186,7 +186,7 @@ internal class ArrayBroadcastChannel( return // done updating here -> return } // we only get out of the lock normally when there is a sender to resume - send!!.completeResumeSend(token!!) + send!!.completeResumeSend() // since we've just sent an element, we might need to resume some receivers checkSubOffers() // tailrec call to recheck @@ -239,9 +239,9 @@ internal class ArrayBroadcastChannel( // it means that `checkOffer` must be retried after every `unlock` if (!subLock.tryLock()) break val receive: ReceiveOrClosed? - val token: Any? + var result: Any? try { - val result = peekUnderLock() + result = peekUnderLock() when { result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock result is Closed<*> -> { @@ -252,15 +252,15 @@ internal class ArrayBroadcastChannel( // find a receiver for an element receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving if (receive is Closed<*>) break // noting more to do if this sub already closed - token = receive.tryResumeReceive(result as E, null) - if (token == null) continue // bail out here to next iteration (see for next receiver) + val token = receive.tryResumeReceive(result as E, null) ?: continue + assert { token === RESUME_TOKEN } val subHead = this.subHead this.subHead = subHead + 1 // retrieved element for this subscriber updated = true } finally { subLock.unlock() } - receive!!.completeResumeReceive(token!!) + receive!!.completeResumeReceive(result as E) } // do close outside of lock if needed closed?.also { close(cause = it.closeCause) } diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 0b850b27e9..f10713d95b 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -50,7 +50,6 @@ internal open class ArrayChannel( // result is `OFFER_SUCCESS | OFFER_FAILED | Closed` protected override fun offerInternal(element: E): Any { var receive: ReceiveOrClosed? = null - var token: Any? = null lock.withLock { val size = this.size closedForSend?.let { return it } @@ -65,8 +64,9 @@ internal open class ArrayChannel( this.size = size // restore size return receive!! } - token = receive!!.tryResumeReceive(element, null) + val token = receive!!.tryResumeReceive(element, null) if (token != null) { + assert { token === RESUME_TOKEN } this.size = size // restore size return@withLock } @@ -80,14 +80,13 @@ internal open class ArrayChannel( return OFFER_FAILED } // breaks here if offer meets receiver - receive!!.completeResumeReceive(token!!) + receive!!.completeResumeReceive(element) return receive!!.offerResult } // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed` protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { var receive: ReceiveOrClosed? = null - var token: Any? = null lock.withLock { val size = this.size closedForSend?.let { return it } @@ -103,8 +102,6 @@ internal open class ArrayChannel( failure == null -> { // offered successfully this.size = size // restore size receive = offerOp.result - token = offerOp.resumeToken - assert { token != null } return@withLock } failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer @@ -130,7 +127,7 @@ internal open class ArrayChannel( return OFFER_FAILED } // breaks here if offer meets receiver - receive!!.completeResumeReceive(token!!) + receive!!.completeResumeReceive(element) return receive!!.offerResult } @@ -150,7 +147,7 @@ internal open class ArrayChannel( // result is `E | POLL_FAILED | Closed` protected override fun pollInternal(): Any? { var send: Send? = null - var token: Any? = null + var resumed = false var result: Any? = null lock.withLock { val size = this.size @@ -164,8 +161,10 @@ internal open class ArrayChannel( if (size == capacity) { loop@ while (true) { send = takeFirstSendOrPeekClosed() ?: break - token = send!!.tryResumeSend(null) + val token = send!!.tryResumeSend(null) if (token != null) { + assert { token === RESUME_TOKEN } + resumed = true replacement = send!!.pollResult break@loop } @@ -178,15 +177,15 @@ internal open class ArrayChannel( head = (head + 1) % buffer.size } // complete send the we're taken replacement from - if (token != null) - send!!.completeResumeSend(token!!) + if (resumed) + send!!.completeResumeSend() return result } // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed` protected override fun pollSelectInternal(select: SelectInstance<*>): Any? { var send: Send? = null - var token: Any? = null + var success = false var result: Any? = null lock.withLock { val size = this.size @@ -204,8 +203,7 @@ internal open class ArrayChannel( when { failure == null -> { // polled successfully send = pollOp.result - token = pollOp.resumeToken - assert { token != null } + success = true replacement = send!!.pollResult break@loop } @@ -218,7 +216,7 @@ internal open class ArrayChannel( } failure is Closed<*> -> { send = failure - token = failure.tryResumeSend(null) + success = true replacement = failure break@loop } @@ -240,8 +238,8 @@ internal open class ArrayChannel( head = (head + 1) % buffer.size } // complete send the we're taken replacement from - if (token != null) - send!!.completeResumeSend(token!!) + if (success) + send!!.completeResumeSend() return result } diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 5318d14757..550ea1f59e 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -87,10 +87,6 @@ public interface SelectClause2 { public fun registerSelectClause2(select: SelectInstance, param: P, block: suspend (Q) -> R) } -@JvmField -@SharedImmutable -internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED") - /** * Internal representation of select instance. This instance is called _selected_ when * the clause to execute is already picked. @@ -111,7 +107,7 @@ public interface SelectInstance { /** * Tries to select this instance. Returns: - * * [SELECT_STARTED] on success, + * * [RESUME_TOKEN] on success, * * [RETRY_ATOMIC] on deadlock (needs retry, it is only possible when [otherOp] is not `null`) * * `null` on failure to select (already selected). * [otherOp] is not null when trying to rendezvous with this select from inside of another select. @@ -370,7 +366,7 @@ internal class SelectBuilderImpl( override fun trySelect(): Boolean { val result = trySelectOther(null) return when { - result === SELECT_STARTED -> true + result === RESUME_TOKEN -> true result == null -> false else -> error("Unexpected trySelectIdempotent result $result") } @@ -460,7 +456,7 @@ internal class SelectBuilderImpl( */ // it is just like plain trySelect, but support idempotent start - // Returns SELECT_STARTED | RETRY_ATOMIC | null (when already selected) + // Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected) override fun trySelectOther(otherOp: PrepareOp?): Any? { _state.loop { state -> // lock-free loop on state when { @@ -477,7 +473,7 @@ internal class SelectBuilderImpl( if (decision !== null) return decision } doAfterSelect() - return SELECT_STARTED + return RESUME_TOKEN } state is OpDescriptor -> { // state is either AtomicSelectOp or PairSelectOp // Found descriptor of ongoing operation while working in the context of other select operation @@ -512,7 +508,7 @@ internal class SelectBuilderImpl( } // otherwise -- already selected otherOp == null -> return null // already selected - state === otherOp.desc -> return SELECT_STARTED // was selected with this marker + state === otherOp.desc -> return RESUME_TOKEN // was selected with this marker else -> return null // selected with different marker } } From 707c01e91f39455f9524ecbb22604b5b86103a5f Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 26 Sep 2019 12:47:54 +0300 Subject: [PATCH 2/2] Narrow down return type of tryResumeSend/Receive They return Symbol? --- .../common/src/channels/AbstractChannel.kt | 36 +++++++++++-------- .../common/src/selects/Select.kt | 6 +++- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 9f672af10b..ce0acc0f53 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -450,8 +450,8 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val select: SelectInstance, @JvmField val block: suspend (SendChannel) -> R ) : Send(), DisposableHandle { - override fun tryResumeSend(otherOp: PrepareOp?): Any? = - select.trySelectOther(otherOp) + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = + select.trySelectOther(otherOp) as Symbol? // must return symbol override fun completeResumeSend() { block.startCoroutine(receiver = channel, completion = select.completion) @@ -473,7 +473,7 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val element: E ) : Send() { override val pollResult: Any? get() = element - override fun tryResumeSend(otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} override fun resumeSendClosed(closed: Closed<*>) {} } @@ -898,9 +898,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel, @JvmField val cont: CancellableContinuation ) : Receive() { - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? { otherOp?.finishPrepare() - return cont.tryResume(true, otherOp?.desc) + val token = cont.tryResume(true, otherOp?.desc) ?: return null + assert { token === RESUME_TOKEN } // the only other possible result + return RESUME_TOKEN } override fun completeResumeReceive(value: E) { @@ -953,8 +957,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, @JvmField val receiveMode: Int ) : Receive(), DisposableHandle { - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = - select.trySelectOther(otherOp) + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = + select.trySelectOther(otherOp) as Symbol? @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(value: E) { @@ -1019,7 +1023,7 @@ internal abstract class Send : LockFreeLinkedListNode() { // RETRY_ATOMIC for retry (only when otherOp != null), // RESUME_TOKEN on success (call completeResumeSend) // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC - abstract fun tryResumeSend(otherOp: PrepareOp?): Any? + abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol? abstract fun completeResumeSend() abstract fun resumeSendClosed(closed: Closed<*>) } @@ -1033,7 +1037,7 @@ internal interface ReceiveOrClosed { // RETRY_ATOMIC for retry (only when otherOp != null), // RESUME_TOKEN on success (call completeResumeReceive) // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC - fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? + fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? fun completeResumeReceive(value: E) } @@ -1045,9 +1049,11 @@ internal class SendElement( override val pollResult: Any?, @JvmField val cont: CancellableContinuation ) : Send() { - override fun tryResumeSend(otherOp: PrepareOp?): Any? { + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? { otherOp?.finishPrepare() - return cont.tryResume(Unit, otherOp?.desc) + val token = cont.tryResume(Unit, otherOp?.desc) + assert { token === RESUME_TOKEN } // the only other possible result + return RESUME_TOKEN } override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN) override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException) @@ -1065,9 +1071,9 @@ internal class Closed( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed[$closeCause]" diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 550ea1f59e..372c9e32dc 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -93,7 +93,7 @@ public interface SelectClause2 { * * @suppress **This is unstable API and it is subject to change.** */ -@InternalCoroutinesApi +@InternalCoroutinesApi // todo: sealed interface https://youtrack.jetbrains.com/issue/KT-22286 public interface SelectInstance { /** * Returns `true` when this [select] statement had already picked a clause to execute. @@ -112,6 +112,10 @@ public interface SelectInstance { * * `null` on failure to select (already selected). * [otherOp] is not null when trying to rendezvous with this select from inside of another select. * In this case, [PrepareOp.finishPrepare] must be called before deciding on any value other than [RETRY_ATOMIC]. + * + * Note, that this method's actual return type is `Symbol?` but we cannot declare it as such, because this + * member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface` + * (see KT-222860) we can declare this method as internal. */ public fun trySelectOther(otherOp: PrepareOp?): Any?