diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 9a19ed61e3..559afb8289 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -14,6 +14,10 @@ private const val UNDECIDED = 0 private const val SUSPENDED = 1 private const val RESUMED = 2 +@JvmField +@SharedImmutable +internal val RESUME_TOKEN = Symbol("RESUME_TOKEN") + /** * @suppress **This is unstable API and it is subject to change.** */ @@ -347,20 +351,21 @@ internal open class CancellableContinuationImpl( parentHandle = NonDisposableHandle } + // Note: Always returns RESUME_TOKEN | null override fun tryResume(value: T, idempotent: Any?): Any? { _state.loop { state -> when (state) { is NotCompleted -> { val update: Any? = if (idempotent == null) value else - CompletedIdempotentResult(idempotent, value, state) + CompletedIdempotentResult(idempotent, value) if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure detachChildIfNonResuable() - return state + return RESUME_TOKEN } is CompletedIdempotentResult -> { return if (state.idempotentResume === idempotent) { assert { state.result === value } // "Non-idempotent resume" - state.token + RESUME_TOKEN } else { null } @@ -377,15 +382,16 @@ internal open class CancellableContinuationImpl( val update = CompletedExceptionally(exception) if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure detachChildIfNonResuable() - return state + return RESUME_TOKEN } else -> return null // cannot resume -- not active anymore } } } + // note: token is always RESUME_TOKEN override fun completeResume(token: Any) { - // note: We don't actually use token anymore, because handler needs to be invoked on cancellation only + assert { token === RESUME_TOKEN } dispatchResume(resumeMode) } @@ -437,8 +443,7 @@ private class InvokeOnCancel( // Clashes with InvokeOnCancellation private class CompletedIdempotentResult( @JvmField val idempotentResume: Any?, - @JvmField val result: Any?, - @JvmField val token: NotCompleted + @JvmField val result: Any? ) { override fun toString(): String = "CompletedIdempotentResult[$result]" } diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 22c1971c0a..ce0acc0f53 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -48,7 +48,8 @@ internal abstract class AbstractSendChannel : SendChannel { val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED val token = receive.tryResumeReceive(element, null) if (token != null) { - receive.completeResumeReceive(token) + assert { token === RESUME_TOKEN } + receive.completeResumeReceive(element) return receive.offerResult } } @@ -65,7 +66,7 @@ internal abstract class AbstractSendChannel : SendChannel { val failure = select.performAtomicTrySelect(offerOp) if (failure != null) return failure val receive = offerOp.result - receive.completeResumeReceive(offerOp.resumeToken!!) + receive.completeResumeReceive(element) return receive.offerResult } @@ -354,8 +355,6 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val element: E, queue: LockFreeLinkedListHead ) : RemoveFirstDesc>(queue) { - @JvmField var resumeToken: Any? = null - override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { is Closed<*> -> affected !is ReceiveOrClosed<*> -> OFFER_FAILED @@ -367,7 +366,7 @@ internal abstract class AbstractSendChannel : SendChannel { val affected = prepareOp.affected as ReceiveOrClosed // see "failure" impl val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED if (token === RETRY_ATOMIC) return RETRY_ATOMIC - resumeToken = token + assert { token === RESUME_TOKEN } return null } } @@ -451,11 +450,10 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val select: SelectInstance, @JvmField val block: suspend (SendChannel) -> R ) : Send(), DisposableHandle { - override fun tryResumeSend(otherOp: PrepareOp?): Any? = - select.trySelectOther(otherOp) + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = + select.trySelectOther(otherOp) as Symbol? // must return symbol - override fun completeResumeSend(token: Any) { - assert { token === SELECT_STARTED } + override fun completeResumeSend() { block.startCoroutine(receiver = channel, completion = select.completion) } @@ -475,8 +473,8 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val element: E ) : Send() { override val pollResult: Any? get() = element - override fun tryResumeSend(otherOp: PrepareOp?): Any? = SEND_RESUMED.also { otherOp?.finishPrepare() } - override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun completeResumeSend() {} override fun resumeSendClosed(closed: Closed<*>) {} } } @@ -511,7 +509,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel(queue: LockFreeLinkedListHead) : RemoveFirstDesc(queue) { - @JvmField var resumeToken: Any? = null - @JvmField var pollResult: E? = null - override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { is Closed<*> -> affected !is Send -> POLL_FAILED @@ -687,8 +683,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel) { when { receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) @@ -923,27 +921,20 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel, @JvmField val cont: CancellableContinuation ) : Receive() { - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? { otherOp?.finishPrepare() - val token = cont.tryResume(true, otherOp?.desc) - if (token != null) { - /* - When otherOp != null this invocation can be stale and we cannot directly update iterator.result - Instead, we save both token & result into a temporary IdempotentTokenValue object and - set iterator result only in completeResumeReceive that is going to be invoked just once - */ - if (otherOp != null) return IdempotentTokenValue(token, value) - iterator.result = value - } - return token + val token = cont.tryResume(true, otherOp?.desc) ?: return null + assert { token === RESUME_TOKEN } // the only other possible result + return RESUME_TOKEN } - override fun completeResumeReceive(token: Any) { - if (token is IdempotentTokenValue<*>) { - iterator.result = token.value - cont.completeResume(token.token) - } else - cont.completeResume(token) + override fun completeResumeReceive(value: E) { + /* + When otherOp != null invocation of tryResumeReceive can happen multiple times and much later, + but completeResumeReceive is called once so we set iterator result here. + */ + iterator.result = value + cont.completeResume(RESUME_TOKEN) } override fun resumeReceiveClosed(closed: Closed<*>) { @@ -966,14 +957,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, @JvmField val receiveMode: Int ) : Receive(), DisposableHandle { - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { - val result = select.trySelectOther(otherOp) - return if (result === SELECT_STARTED) value ?: NULL_VALUE else result - } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = + select.trySelectOther(otherOp) as Symbol? @Suppress("UNCHECKED_CAST") - override fun completeResumeReceive(token: Any) { - val value: E = NULL_VALUE.unbox(token) + override fun completeResumeReceive(value: E) { block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion) } @@ -997,11 +985,6 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( - @JvmField val token: Any, - @JvmField val value: E - ) } // receiveMode values @@ -1025,18 +1008,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED") @SharedImmutable internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED") -@JvmField -@SharedImmutable -internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE") - -@JvmField -@SharedImmutable -internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED") - -@JvmField -@SharedImmutable -internal val SEND_RESUMED: Any = Symbol("SEND_RESUMED") - @JvmField @SharedImmutable internal val HANDLER_INVOKED: Any = Symbol("ON_CLOSE_HANDLER_INVOKED") @@ -1050,10 +1021,10 @@ internal abstract class Send : LockFreeLinkedListNode() { abstract val pollResult: Any? // E | Closed // Returns: null - failure, // RETRY_ATOMIC for retry (only when otherOp != null), - // otherwise token for completeResumeSend + // RESUME_TOKEN on success (call completeResumeSend) // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC - abstract fun tryResumeSend(otherOp: PrepareOp?): Any? - abstract fun completeResumeSend(token: Any) + abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol? + abstract fun completeResumeSend() abstract fun resumeSendClosed(closed: Closed<*>) } @@ -1064,10 +1035,10 @@ internal interface ReceiveOrClosed { val offerResult: Any // OFFER_SUCCESS | Closed // Returns: null - failure, // RETRY_ATOMIC for retry (only when otherOp != null), - // otherwise token for completeResumeReceive + // RESUME_TOKEN on success (call completeResumeReceive) // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC - fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? - fun completeResumeReceive(token: Any) + fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? + fun completeResumeReceive(value: E) } /** @@ -1078,11 +1049,13 @@ internal class SendElement( override val pollResult: Any?, @JvmField val cont: CancellableContinuation ) : Send() { - override fun tryResumeSend(otherOp: PrepareOp?): Any? { + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? { otherOp?.finishPrepare() - return cont.tryResume(Unit, otherOp?.desc) + val token = cont.tryResume(Unit, otherOp?.desc) + assert { token === RESUME_TOKEN } // the only other possible result + return RESUME_TOKEN } - override fun completeResumeSend(token: Any) = cont.completeResume(token) + override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN) override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException) override fun toString(): String = "SendElement($pollResult)" } @@ -1098,10 +1071,10 @@ internal class Closed( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() } - override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } } - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() } - override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun completeResumeSend() {} + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed[$closeCause]" } diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt index c9fb3cc74d..5f3eb32d8f 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt @@ -143,7 +143,6 @@ internal class ArrayBroadcastChannel( private tailrec fun updateHead(addSub: Subscriber? = null, removeSub: Subscriber? = null) { // update head in a tail rec loop var send: Send? = null - var token: Any? = null bufferLock.withLock { if (addSub != null) { addSub.subHead = tail // start from last element @@ -172,8 +171,9 @@ internal class ArrayBroadcastChannel( while (true) { send = takeFirstSendOrPeekClosed() ?: break // when when no sender if (send is Closed<*>) break // break when closed for send - token = send!!.tryResumeSend(null) + val token = send!!.tryResumeSend(null) if (token != null) { + assert { token === RESUME_TOKEN } // put sent element to the buffer buffer[(tail % capacity).toInt()] = (send as Send).pollResult this.size = size + 1 @@ -186,7 +186,7 @@ internal class ArrayBroadcastChannel( return // done updating here -> return } // we only get out of the lock normally when there is a sender to resume - send!!.completeResumeSend(token!!) + send!!.completeResumeSend() // since we've just sent an element, we might need to resume some receivers checkSubOffers() // tailrec call to recheck @@ -239,9 +239,9 @@ internal class ArrayBroadcastChannel( // it means that `checkOffer` must be retried after every `unlock` if (!subLock.tryLock()) break val receive: ReceiveOrClosed? - val token: Any? + var result: Any? try { - val result = peekUnderLock() + result = peekUnderLock() when { result === POLL_FAILED -> continue@loop // must retest `needsToCheckOfferWithoutLock` outside of the lock result is Closed<*> -> { @@ -252,15 +252,15 @@ internal class ArrayBroadcastChannel( // find a receiver for an element receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving if (receive is Closed<*>) break // noting more to do if this sub already closed - token = receive.tryResumeReceive(result as E, null) - if (token == null) continue // bail out here to next iteration (see for next receiver) + val token = receive.tryResumeReceive(result as E, null) ?: continue + assert { token === RESUME_TOKEN } val subHead = this.subHead this.subHead = subHead + 1 // retrieved element for this subscriber updated = true } finally { subLock.unlock() } - receive!!.completeResumeReceive(token!!) + receive!!.completeResumeReceive(result as E) } // do close outside of lock if needed closed?.also { close(cause = it.closeCause) } diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 0b850b27e9..f10713d95b 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -50,7 +50,6 @@ internal open class ArrayChannel( // result is `OFFER_SUCCESS | OFFER_FAILED | Closed` protected override fun offerInternal(element: E): Any { var receive: ReceiveOrClosed? = null - var token: Any? = null lock.withLock { val size = this.size closedForSend?.let { return it } @@ -65,8 +64,9 @@ internal open class ArrayChannel( this.size = size // restore size return receive!! } - token = receive!!.tryResumeReceive(element, null) + val token = receive!!.tryResumeReceive(element, null) if (token != null) { + assert { token === RESUME_TOKEN } this.size = size // restore size return@withLock } @@ -80,14 +80,13 @@ internal open class ArrayChannel( return OFFER_FAILED } // breaks here if offer meets receiver - receive!!.completeResumeReceive(token!!) + receive!!.completeResumeReceive(element) return receive!!.offerResult } // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed` protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { var receive: ReceiveOrClosed? = null - var token: Any? = null lock.withLock { val size = this.size closedForSend?.let { return it } @@ -103,8 +102,6 @@ internal open class ArrayChannel( failure == null -> { // offered successfully this.size = size // restore size receive = offerOp.result - token = offerOp.resumeToken - assert { token != null } return@withLock } failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer @@ -130,7 +127,7 @@ internal open class ArrayChannel( return OFFER_FAILED } // breaks here if offer meets receiver - receive!!.completeResumeReceive(token!!) + receive!!.completeResumeReceive(element) return receive!!.offerResult } @@ -150,7 +147,7 @@ internal open class ArrayChannel( // result is `E | POLL_FAILED | Closed` protected override fun pollInternal(): Any? { var send: Send? = null - var token: Any? = null + var resumed = false var result: Any? = null lock.withLock { val size = this.size @@ -164,8 +161,10 @@ internal open class ArrayChannel( if (size == capacity) { loop@ while (true) { send = takeFirstSendOrPeekClosed() ?: break - token = send!!.tryResumeSend(null) + val token = send!!.tryResumeSend(null) if (token != null) { + assert { token === RESUME_TOKEN } + resumed = true replacement = send!!.pollResult break@loop } @@ -178,15 +177,15 @@ internal open class ArrayChannel( head = (head + 1) % buffer.size } // complete send the we're taken replacement from - if (token != null) - send!!.completeResumeSend(token!!) + if (resumed) + send!!.completeResumeSend() return result } // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed` protected override fun pollSelectInternal(select: SelectInstance<*>): Any? { var send: Send? = null - var token: Any? = null + var success = false var result: Any? = null lock.withLock { val size = this.size @@ -204,8 +203,7 @@ internal open class ArrayChannel( when { failure == null -> { // polled successfully send = pollOp.result - token = pollOp.resumeToken - assert { token != null } + success = true replacement = send!!.pollResult break@loop } @@ -218,7 +216,7 @@ internal open class ArrayChannel( } failure is Closed<*> -> { send = failure - token = failure.tryResumeSend(null) + success = true replacement = failure break@loop } @@ -240,8 +238,8 @@ internal open class ArrayChannel( head = (head + 1) % buffer.size } // complete send the we're taken replacement from - if (token != null) - send!!.completeResumeSend(token!!) + if (success) + send!!.completeResumeSend() return result } diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 5318d14757..372c9e32dc 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -87,17 +87,13 @@ public interface SelectClause2 { public fun registerSelectClause2(select: SelectInstance, param: P, block: suspend (Q) -> R) } -@JvmField -@SharedImmutable -internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED") - /** * Internal representation of select instance. This instance is called _selected_ when * the clause to execute is already picked. * * @suppress **This is unstable API and it is subject to change.** */ -@InternalCoroutinesApi +@InternalCoroutinesApi // todo: sealed interface https://youtrack.jetbrains.com/issue/KT-22286 public interface SelectInstance { /** * Returns `true` when this [select] statement had already picked a clause to execute. @@ -111,11 +107,15 @@ public interface SelectInstance { /** * Tries to select this instance. Returns: - * * [SELECT_STARTED] on success, + * * [RESUME_TOKEN] on success, * * [RETRY_ATOMIC] on deadlock (needs retry, it is only possible when [otherOp] is not `null`) * * `null` on failure to select (already selected). * [otherOp] is not null when trying to rendezvous with this select from inside of another select. * In this case, [PrepareOp.finishPrepare] must be called before deciding on any value other than [RETRY_ATOMIC]. + * + * Note, that this method's actual return type is `Symbol?` but we cannot declare it as such, because this + * member is public, but [Symbol] is internal. When [SelectInstance] becomes a `sealed interface` + * (see KT-222860) we can declare this method as internal. */ public fun trySelectOther(otherOp: PrepareOp?): Any? @@ -370,7 +370,7 @@ internal class SelectBuilderImpl( override fun trySelect(): Boolean { val result = trySelectOther(null) return when { - result === SELECT_STARTED -> true + result === RESUME_TOKEN -> true result == null -> false else -> error("Unexpected trySelectIdempotent result $result") } @@ -460,7 +460,7 @@ internal class SelectBuilderImpl( */ // it is just like plain trySelect, but support idempotent start - // Returns SELECT_STARTED | RETRY_ATOMIC | null (when already selected) + // Returns RESUME_TOKEN | RETRY_ATOMIC | null (when already selected) override fun trySelectOther(otherOp: PrepareOp?): Any? { _state.loop { state -> // lock-free loop on state when { @@ -477,7 +477,7 @@ internal class SelectBuilderImpl( if (decision !== null) return decision } doAfterSelect() - return SELECT_STARTED + return RESUME_TOKEN } state is OpDescriptor -> { // state is either AtomicSelectOp or PairSelectOp // Found descriptor of ongoing operation while working in the context of other select operation @@ -512,7 +512,7 @@ internal class SelectBuilderImpl( } // otherwise -- already selected otherOp == null -> return null // already selected - state === otherOp.desc -> return SELECT_STARTED // was selected with this marker + state === otherOp.desc -> return RESUME_TOKEN // was selected with this marker else -> return null // selected with different marker } }