From 6807cf02da7389a699c11d0fe824b126fc360966 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 30 Aug 2019 13:01:19 +0300 Subject: [PATCH] Fix StackOverflowException with select expressions * onSend/onReceive clauses on the same channel: Instead of StackOverflowError we throw IllegalStateException and leave the channel in the original state. * Fix SOE in select with "opposite channels" stress-test. The fix is based on the sequential numbering of atomic select operation. Deadlock is detected and the operation with the lower sequential number is aborted and restarted (with a larger number). Fixes #504 Fixes #1411 --- .../kotlinx-coroutines-core.txt | 7 +- .../common/src/JobSupport.kt | 8 +- .../common/src/channels/AbstractChannel.kt | 89 ++++-- .../src/channels/ArrayBroadcastChannel.kt | 10 +- .../common/src/channels/ArrayChannel.kt | 12 +- .../src/channels/ConflatedBroadcastChannel.kt | 2 +- .../common/src/channels/ConflatedChannel.kt | 3 +- .../common/src/channels/LinkedListChannel.kt | 2 + .../common/src/internal/Atomic.kt | 50 ++- .../src/internal/LockFreeLinkedList.common.kt | 22 +- .../common/src/selects/Select.kt | 285 +++++++++++++++--- .../common/src/sync/Mutex.kt | 9 +- .../test/selects/SelectArrayChannelTest.kt | 4 +- .../test/selects/SelectBuilderImplTest.kt | 117 ------- .../selects/SelectRendezvousChannelTest.kt | 38 ++- .../js/src/internal/LinkedList.kt | 45 +-- .../jvm/src/internal/LockFreeLinkedList.kt | 145 +++++---- .../LockFreeLinkedListAtomicLFStressTest.kt | 8 + .../test/internal/LockFreeLinkedListTest.kt | 3 + .../test/selects/SelectChannelStressTest.kt | 4 +- .../selects/SelectDeadlockLFStressTest.kt | 101 +++++++ .../test/selects/SelectDeadlockStressTest.kt | 62 ++++ .../native/src/internal/LinkedList.kt | 45 +-- 23 files changed, 747 insertions(+), 324 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/test/selects/SelectBuilderImplTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockStressTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 1d7954f53c..26c152c141 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -1050,7 +1050,9 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object; public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V public fun resumeWith (Ljava/lang/Object;)V - public fun trySelect (Ljava/lang/Object;)Z + public fun toString ()Ljava/lang/String; + public fun trySelect ()Z + public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/selects/SelectClause0 { @@ -1071,7 +1073,8 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance { public abstract fun isSelected ()Z public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object; public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V - public abstract fun trySelect (Ljava/lang/Object;)Z + public abstract fun trySelect ()Z + public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object; } public final class kotlinx/coroutines/selects/SelectKt { diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index d7ca5f6750..ca38cc156f 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -553,7 +553,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (select.isSelected) return if (state !is Incomplete) { // already complete -- select result - if (select.trySelect(null)) { + if (select.trySelect()) { block.startCoroutineUnintercepted(select.completion) } return @@ -1181,7 +1181,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren if (select.isSelected) return if (state !is Incomplete) { // already complete -- select result - if (select.trySelect(null)) { + if (select.trySelect()) { if (state is CompletedExceptionally) { select.resumeSelectCancellableWithException(state.cause) } @@ -1362,7 +1362,7 @@ private class SelectJoinOnCompletion( private val block: suspend () -> R ) : JobNode(job) { override fun invoke(cause: Throwable?) { - if (select.trySelect(null)) + if (select.trySelect()) block.startCoroutineCancellable(select.completion) } override fun toString(): String = "SelectJoinOnCompletion[$select]" @@ -1374,7 +1374,7 @@ private class SelectAwaitOnCompletion( private val block: suspend (T) -> R ) : JobNode(job) { override fun invoke(cause: Throwable?) { - if (select.trySelect(null)) + if (select.trySelect()) job.selectAwaitCompletion(select, block) } override fun toString(): String = "SelectAwaitOnCompletion[$select]" diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 30156ea694..a9845c5aa0 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -46,7 +46,7 @@ internal abstract class AbstractSendChannel : SendChannel { protected open fun offerInternal(element: E): Any { while (true) { val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED - val token = receive.tryResumeReceive(element, idempotent = null) + val token = receive.tryResumeReceive(element, null) if (token != null) { receive.completeResumeReceive(token) return receive.offerResult @@ -56,7 +56,7 @@ internal abstract class AbstractSendChannel : SendChannel { /** * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet. - * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`. + * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`. * @suppress **This is unstable API and it is subject to change.** */ protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { @@ -362,10 +362,13 @@ internal abstract class AbstractSendChannel : SendChannel { else -> null } - override fun validatePrepared(node: ReceiveOrClosed): Boolean { - val token = node.tryResumeReceive(element, idempotent = this) ?: return false + @Suppress("UNCHECKED_CAST") + override fun onPrepare(prepareOp: PrepareOp): Any? { + val affected = prepareOp.affected as ReceiveOrClosed // see "failure" impl + val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED + if (token === RETRY_ATOMIC) return RETRY_ATOMIC resumeToken = token - return true + return null } } @@ -398,6 +401,7 @@ internal abstract class AbstractSendChannel : SendChannel { when { offerResult === ALREADY_SELECTED -> return offerResult === OFFER_FAILED -> {} // retry + offerResult === RETRY_ATOMIC -> {} // retry offerResult === OFFER_SUCCESS -> { block.startCoroutineUnintercepted(receiver = this, completion = select.completion) return @@ -447,8 +451,8 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val select: SelectInstance, @JvmField val block: suspend (SendChannel) -> R ) : Send(), DisposableHandle { - override fun tryResumeSend(idempotent: Any?): Any? = - if (select.trySelect(idempotent)) SELECT_STARTED else null + override fun tryResumeSend(otherOp: PrepareOp?): Any? = + select.trySelectOther(otherOp) override fun completeResumeSend(token: Any) { assert { token === SELECT_STARTED } @@ -460,7 +464,7 @@ internal abstract class AbstractSendChannel : SendChannel { } override fun resumeSendClosed(closed: Closed<*>) { - if (select.trySelect(null)) + if (select.trySelect()) select.resumeSelectCancellableWithException(closed.sendException) } @@ -471,7 +475,7 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val element: E ) : Send() { override val pollResult: Any? get() = element - override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED + override fun tryResumeSend(otherOp: PrepareOp?): Any? = SEND_RESUMED.also { otherOp?.finishPrepare() } override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } } override fun resumeSendClosed(closed: Closed<*>) {} } @@ -505,7 +509,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel): Any? { @@ -679,11 +683,13 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel return pollResult === POLL_FAILED -> {} // retry + pollResult === RETRY_ATOMIC -> {} // retry pollResult is Closed<*> -> throw recoverStackTrace(pollResult.receiveException) else -> { block.startCoroutineUnintercepted(pollResult as E, select.completion) @@ -733,9 +740,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel return pollResult === POLL_FAILED -> {} // retry + pollResult === RETRY_ATOMIC -> {} // retry pollResult is Closed<*> -> { if (pollResult.closeCause == null) { - if (select.trySelect(null)) + if (select.trySelect()) block.startCoroutineUnintercepted(null, select.completion) return } else { @@ -770,6 +778,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel return pollResult === POLL_FAILED -> {} // retry + pollResult === RETRY_ATOMIC -> {} // retry pollResult is Closed<*> -> { block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion) } @@ -894,7 +903,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) { when { @@ -910,15 +923,16 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel, @JvmField val cont: CancellableContinuation ) : Receive() { - override fun tryResumeReceive(value: E, idempotent: Any?): Any? { - val token = cont.tryResume(true, idempotent) + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { + otherOp?.finishPrepare() + val token = cont.tryResume(true, otherOp?.desc) if (token != null) { /* - When idempotent != null this invocation can be stale and we cannot directly update iterator.result + When otherOp != null this invocation can be stale and we cannot directly update iterator.result Instead, we save both token & result into a temporary IdempotentTokenValue object and set iterator result only in completeResumeReceive that is going to be invoked just once */ - if (idempotent != null) return IdempotentTokenValue(token, value) + if (otherOp != null) return IdempotentTokenValue(token, value) iterator.result = value } return token @@ -952,8 +966,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, @JvmField val receiveMode: Int ) : Receive(), DisposableHandle { - override fun tryResumeReceive(value: E, idempotent: Any?): Any? = - if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? { + val result = select.trySelectOther(otherOp) + return if (result === SELECT_STARTED) value ?: NULL_VALUE else result + } @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(token: Any) { @@ -962,7 +978,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) { - if (!select.trySelect(null)) return + if (!select.trySelect()) return when (receiveMode) { RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException) RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed(closed.closeCause), select.completion) @@ -1009,10 +1025,6 @@ internal val POLL_FAILED: Any = Symbol("POLL_FAILED") @SharedImmutable internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED") -@JvmField -@SharedImmutable -internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED") - @JvmField @SharedImmutable internal val NULL_VALUE: Symbol = Symbol("NULL_VALUE") @@ -1036,7 +1048,11 @@ internal typealias Handler = (Throwable?) -> Unit */ internal abstract class Send : LockFreeLinkedListNode() { abstract val pollResult: Any? // E | Closed - abstract fun tryResumeSend(idempotent: Any?): Any? + // Returns: null - failure, + // RETRY_ATOMIC for retry (only when otherOp != null), + // otherwise token for completeResumeSend + // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC + abstract fun tryResumeSend(otherOp: PrepareOp?): Any? abstract fun completeResumeSend(token: Any) abstract fun resumeSendClosed(closed: Closed<*>) } @@ -1046,7 +1062,11 @@ internal abstract class Send : LockFreeLinkedListNode() { */ internal interface ReceiveOrClosed { val offerResult: Any // OFFER_SUCCESS | Closed - fun tryResumeReceive(value: E, idempotent: Any?): Any? + // Returns: null - failure, + // RETRY_ATOMIC for retry (only when otherOp != null), + // otherwise token for completeResumeReceive + // Must call otherOp?.finishPrepare() before deciding on result other than RETRY_ATOMIC + fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? fun completeResumeReceive(token: Any) } @@ -1058,7 +1078,10 @@ internal class SendElement( override val pollResult: Any?, @JvmField val cont: CancellableContinuation ) : Send() { - override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent) + override fun tryResumeSend(otherOp: PrepareOp?): Any? { + otherOp?.finishPrepare() + return cont.tryResume(Unit, otherOp?.desc) + } override fun completeResumeSend(token: Any) = cont.completeResume(token) override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException) override fun toString(): String = "SendElement($pollResult)" @@ -1075,9 +1098,9 @@ internal class Closed( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(idempotent: Any?): Any? = CLOSE_RESUMED + override fun tryResumeSend(otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() } override fun completeResumeSend(token: Any) { assert { token === CLOSE_RESUMED } } - override fun tryResumeReceive(value: E, idempotent: Any?): Any? = CLOSE_RESUMED + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Any? = CLOSE_RESUMED.also { otherOp?.finishPrepare() } override fun completeResumeReceive(token: Any) { assert { token === CLOSE_RESUMED } } override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed[$closeCause]" diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt index 49fce272c3..d70a56c33b 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayBroadcastChannel.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -105,7 +105,7 @@ internal class ArrayBroadcastChannel( val size = this.size if (size >= capacity) return OFFER_FAILED // let's try to select sending this element to buffer - if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock + if (!select.trySelect()) { // :todo: move trySelect completion outside of lock return ALREADY_SELECTED } val tail = this.tail @@ -163,7 +163,7 @@ internal class ArrayBroadcastChannel( while (true) { send = takeFirstSendOrPeekClosed() ?: break // when when no sender if (send is Closed<*>) break // break when closed for send - token = send!!.tryResumeSend(idempotent = null) + token = send!!.tryResumeSend(null) if (token != null) { // put sent element to the buffer buffer[(tail % capacity).toInt()] = (send as Send).pollResult @@ -242,7 +242,7 @@ internal class ArrayBroadcastChannel( // find a receiver for an element receive = takeFirstReceiveOrPeekClosed() ?: break // break when no one's receiving if (receive is Closed<*>) break // noting more to do if this sub already closed - token = receive.tryResumeReceive(result as E, idempotent = null) + token = receive.tryResumeReceive(result as E, null) if (token == null) continue // bail out here to next iteration (see for next receiver) val subHead = this.subHead this.subHead = subHead + 1 // retrieved element for this subscriber @@ -296,7 +296,7 @@ internal class ArrayBroadcastChannel( result === POLL_FAILED -> { /* just bail out of lock */ } else -> { // let's try to select receiving this element from buffer - if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock + if (!select.trySelect()) { // :todo: move trySelect completion outside of lock result = ALREADY_SELECTED } else { // update subHead after retrieiving element from buffer diff --git a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt index 96302dd106..d643610375 100644 --- a/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ArrayChannel.kt @@ -62,7 +62,7 @@ internal open class ArrayChannel( this.size = size // restore size return receive!! } - token = receive!!.tryResumeReceive(element, idempotent = null) + token = receive!!.tryResumeReceive(element, null) if (token != null) { this.size = size // restore size return@withLock @@ -105,6 +105,7 @@ internal open class ArrayChannel( return@withLock } failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer + failure === RETRY_ATOMIC -> {} // retry failure === ALREADY_SELECTED || failure is Closed<*> -> { this.size = size // restore size return failure @@ -114,7 +115,7 @@ internal open class ArrayChannel( } } // let's try to select sending this element to buffer - if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock + if (!select.trySelect()) { // :todo: move trySelect completion outside of lock this.size = size // restore size return ALREADY_SELECTED } @@ -160,7 +161,7 @@ internal open class ArrayChannel( if (size == capacity) { loop@ while (true) { send = takeFirstSendOrPeekClosed() ?: break - token = send!!.tryResumeSend(idempotent = null) + token = send!!.tryResumeSend(null) if (token != null) { replacement = send!!.pollResult break@loop @@ -206,6 +207,7 @@ internal open class ArrayChannel( break@loop } failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer + failure === RETRY_ATOMIC -> {} // retry failure === ALREADY_SELECTED -> { this.size = size // restore size buffer[head] = result // restore head @@ -213,7 +215,7 @@ internal open class ArrayChannel( } failure is Closed<*> -> { send = failure - token = failure.tryResumeSend(idempotent = null) + token = failure.tryResumeSend(null) replacement = failure break@loop } @@ -226,7 +228,7 @@ internal open class ArrayChannel( buffer[(head + size) % buffer.size] = replacement } else { // failed to poll or is already closed --> let's try to select receiving this element from buffer - if (!select.trySelect(null)) { // :todo: move trySelect completion outside of lock + if (!select.trySelect()) { // :todo: move trySelect completion outside of lock this.size = size // restore size buffer[head] = result // restore head return ALREADY_SELECTED diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt index 61b1f7ae0d..3f15550962 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBroadcastChannel.kt @@ -273,7 +273,7 @@ public class ConflatedBroadcastChannel() : BroadcastChannel { } private fun registerSelectSend(select: SelectInstance, element: E, block: suspend (SendChannel) -> R) { - if (!select.trySelect(null)) return + if (!select.trySelect()) return offerInternal(element)?.let { select.resumeSelectCancellableWithException(it.sendException) return diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt index 21a18832a4..c04ccc4c39 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -85,6 +85,7 @@ internal open class ConflatedChannel : AbstractChannel() { result === ALREADY_SELECTED -> return ALREADY_SELECTED result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> {} // retry + result === RETRY_ATOMIC -> {} // retry result is Closed<*> -> return result else -> error("Invalid result $result") } diff --git a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt index 2a73930ee9..d925be1c50 100644 --- a/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.channels +import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* /** @@ -51,6 +52,7 @@ internal open class LinkedListChannel : AbstractChannel() { result === ALREADY_SELECTED -> return ALREADY_SELECTED result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> {} // retry + result === RETRY_ATOMIC -> {} // retry result is Closed<*> -> return result else -> error("Invalid result $result") } diff --git a/kotlinx-coroutines-core/common/src/internal/Atomic.kt b/kotlinx-coroutines-core/common/src/internal/Atomic.kt index bc52815361..8a1185ae13 100644 --- a/kotlinx-coroutines-core/common/src/internal/Atomic.kt +++ b/kotlinx-coroutines-core/common/src/internal/Atomic.kt @@ -1,11 +1,12 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal import kotlinx.atomicfu.atomic import kotlinx.coroutines.* +import kotlin.jvm.* /** * The most abstract operation that can be in process. Other threads observing an instance of this @@ -19,6 +20,20 @@ public abstract class OpDescriptor { * object that indicates the failure reason. */ abstract fun perform(affected: Any?): Any? + + /** + * Returns reference to atomic operation that this descriptor is a part of or `null` + * if not a part of any [AtomicOp]. + */ + abstract val atomicOp: AtomicOp<*>? + + override fun toString(): String = "$classSimpleName@$hexAddress" // debug + + fun isEarlierThan(that: OpDescriptor): Boolean { + val thisOp = atomicOp ?: return false + val thatOp = that.atomicOp ?: return false + return thisOp.opSequence < thatOp.opSequence + } } @SharedImmutable @@ -40,13 +55,27 @@ public abstract class AtomicOp : OpDescriptor() { val isDecided: Boolean get() = _consensus.value !== NO_DECISION - fun tryDecide(decision: Any?): Boolean { + /** + * Sequence number of this multi-word operation for deadlock resolution. + * An operation with lower number aborts itself with (using [RETRY_ATOMIC] error symbol) if it encounters + * the need to help the operation with higher sequence number and then restarts + * (using higher `opSequence` to ensure progress). + * Simple operations that cannot get into the deadlock always return zero here. + * + * See https://github.com/Kotlin/kotlinx.coroutines/issues/504 + */ + open val opSequence: Long get() = 0L + + override val atomicOp: AtomicOp<*> get() = this + + fun decide(decision: Any?): Any? { assert { decision !== NO_DECISION } - return _consensus.compareAndSet(NO_DECISION, decision) + val current = _consensus.value + if (current !== NO_DECISION) return current + if (_consensus.compareAndSet(NO_DECISION, decision)) return decision + return _consensus.value } - private fun decide(decision: Any?): Any? = if (tryDecide(decision)) decision else _consensus.value - abstract fun prepare(affected: T): Any? // `null` if Ok, or failure reason abstract fun complete(affected: T, failure: Any?) // failure != null if failed to prepare op @@ -59,7 +88,7 @@ public abstract class AtomicOp : OpDescriptor() { if (decision === NO_DECISION) { decision = decide(prepare(affected as T)) } - + // complete operation complete(affected as T, decision) return decision } @@ -71,6 +100,15 @@ public abstract class AtomicOp : OpDescriptor() { * @suppress **This is unstable API and it is subject to change.** */ public abstract class AtomicDesc { + lateinit var atomicOp: AtomicOp<*> // the reference to parent atomicOp, init when AtomicOp is created abstract fun prepare(op: AtomicOp<*>): Any? // returns `null` if prepared successfully abstract fun complete(op: AtomicOp<*>, failure: Any?) // decision == null if success } + +/** + * It is returned as an error by [AtomicOp] implementations when they detect potential deadlock + * using [AtomicOp.opSequence] numbers. + */ +@JvmField +@SharedImmutable +internal val RETRY_ATOMIC: Any = Symbol("RETRY_ATOMIC") diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index fc1c72f067..39dc1d2884 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines.internal +import kotlin.jvm.* + /** @suppress **This is unstable API and it is subject to change.** */ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean @@ -49,7 +51,7 @@ public expect open class AddLastDesc( ) : AbstractAtomicDesc { val queue: LockFreeLinkedListNode val node: T - protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? + override fun finishPrepare(prepareOp: PrepareOp) override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } @@ -57,8 +59,7 @@ public expect open class AddLastDesc( public expect open class RemoveFirstDesc(queue: LockFreeLinkedListNode): AbstractAtomicDesc { val queue: LockFreeLinkedListNode public val result: T - protected open fun validatePrepared(node: T): Boolean - protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? + override fun finishPrepare(prepareOp: PrepareOp) final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } @@ -68,6 +69,19 @@ public expect abstract class AbstractAtomicDesc : AtomicDesc { final override fun complete(op: AtomicOp<*>, failure: Any?) protected open fun failure(affected: LockFreeLinkedListNode): Any? protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean - protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure + public abstract fun finishPrepare(prepareOp: PrepareOp) // non-null on failure + public open fun onPrepare(prepareOp: PrepareOp): Any? // non-null on failure protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } + +/** @suppress **This is unstable API and it is subject to change.** */ +public expect class PrepareOp: OpDescriptor { + val affected: LockFreeLinkedListNode + override val atomicOp: AtomicOp<*> + val desc: AbstractAtomicDesc + fun finishPrepare() +} + +@JvmField +@SharedImmutable +internal val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED") \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 4626fe1d38..eb793f4840 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -87,6 +87,10 @@ public interface SelectClause2 { public fun registerSelectClause2(select: SelectInstance, param: P, block: suspend (Q) -> R) } +@JvmField +@SharedImmutable +internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED") + /** * Internal representation of select instance. This instance is called _selected_ when * the clause to execute is already picked. @@ -101,12 +105,23 @@ public interface SelectInstance { public val isSelected: Boolean /** - * Tries to select this instance. + * Tries to select this instance. Returns `true` on success. + */ + public fun trySelect(): Boolean + + /** + * Tries to select this instance. Returns: + * * [SELECT_STARTED] on success, + * * [RETRY_ATOMIC] on deadlock (needs retry, it is only possible when [otherOp] is not `null`) + * * `null` on failure to select (already selected). + * [otherOp] is not null when trying to rendezvous with this select from inside of another select. + * In this case, [PrepareOp.finishPrepare] must be called before deciding on any value other than [RETRY_ATOMIC]. */ - public fun trySelect(idempotent: Any?): Boolean + public fun trySelectOther(otherOp: PrepareOp?): Any? /** * Performs action atomically with [trySelect]. + * May return [RETRY_ATOMIC], caller shall retry with **fresh instance of desc**. */ public fun performAtomicTrySelect(desc: AtomicDesc): Any? @@ -189,11 +204,22 @@ private val UNDECIDED: Any = Symbol("UNDECIDED") @SharedImmutable private val RESUMED: Any = Symbol("RESUMED") +// Global counter of all atomic select operations for their deadlock resolution +// The separate internal class is work-around for Atomicfu's current implementation that creates public classes +// for static atomics +internal class SeqNumber { + private val number = atomic(1L) + fun next() = number.incrementAndGet() +} + +private val selectOpSequenceNumber = SeqNumber() + @PublishedApi internal class SelectBuilderImpl( private val uCont: Continuation // unintercepted delegate continuation ) : LockFreeLinkedListHead(), SelectBuilder, - SelectInstance, Continuation, CoroutineStackFrame { + SelectInstance, Continuation, CoroutineStackFrame +{ override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame @@ -234,9 +260,7 @@ internal class SelectBuilderImpl( _result.loop { result -> when { result === UNDECIDED -> if (_result.compareAndSet(UNDECIDED, value())) return - result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, - RESUMED - )) { + result === COROUTINE_SUSPENDED -> if (_result.compareAndSet(COROUTINE_SUSPENDED, RESUMED)) { block() return } @@ -290,29 +314,22 @@ internal class SelectBuilderImpl( private inner class SelectOnCancelling(job: Job) : JobCancellingNode(job) { // Note: may be invoked multiple times, but only the first trySelect succeeds anyway override fun invoke(cause: Throwable?) { - if (trySelect(null)) + if (trySelect()) resumeSelectCancellableWithException(job.getCancellationException()) } override fun toString(): String = "SelectOnCancelling[${this@SelectBuilderImpl}]" } - private val state: Any? get() { - _state.loop { state -> - if (state !is OpDescriptor) return state - state.perform(this) - } - } - @PublishedApi internal fun handleBuilderException(e: Throwable) { - if (trySelect(null)) { + if (trySelect()) { resumeWithException(e) } else if (e !is CancellationException) { /* * Cannot handle this exception -- builder was already resumed with a different exception, * so treat it as "unhandled exception". But only if it is not the completion reason * and it's not the cancellation. Otherwise, in the face of structured concurrency - * the same exception will be reported to theglobal exception handler. + * the same exception will be reported to the global exception handler. */ val result = getResult() if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) { @@ -321,7 +338,13 @@ internal class SelectBuilderImpl( } } - override val isSelected: Boolean get() = state !== this + override val isSelected: Boolean get() = _state.loop { state -> + when { + state === this -> return false + state is OpDescriptor -> state.perform(this) // help + else -> return true // already selected + } + } override fun disposeOnSelect(handle: DisposableHandle) { val node = DisposeNode(handle) @@ -342,40 +365,209 @@ internal class SelectBuilderImpl( } } - // it is just like start(), but support idempotent start - override fun trySelect(idempotent: Any?): Boolean { - assert { idempotent !is OpDescriptor } // "cannot use OpDescriptor as idempotent marker" - while (true) { // lock-free loop on state - val state = this.state + override fun trySelect(): Boolean { + val result = trySelectOther(null) + return when { + result === SELECT_STARTED -> true + result == null -> false + else -> error("Unexpected trySelectIdempotent result $result") + } + } + + /* + Diagram for rendezvous between two select operations: + + +---------+ +------------------------+ state(c) + | Channel | | SelectBuilderImpl(1) | -----------------------------------+ + +---------+ +------------------------+ | + | queue ^ | + V | select | + +---------+ next +------------------------+ next +--------------+ | + | LLHead | ------> | Send/ReceiveSelect(3) | -+----> | NextNode ... | | + +---------+ +------------------------+ | +--------------+ | + ^ ^ | next(b) ^ | + | affected | V | | + | +-----------------+ next | V + | | PrepareOp(6) | ----------+ +-----------------+ + | +-----------------+ <-------------------- | PairSelectOp(7) | + | | desc +-----------------+ + | V + | queue +----------------------+ + +------------------------- | TryPoll/OfferDesc(5) | + +----------------------+ + atomicOp | ^ + V | desc + +----------------------+ impl +---------------------+ + | SelectBuilderImpl(2) | <----- | AtomicSelectOp(4) | + +----------------------+ +---------------------+ + | state(a) ^ + | | + +----------------------------+ + + + 0. The first select operation SelectBuilderImpl(1) had already registered Send/ReceiveSelect(3) node + in the channel. + 1. The second select operation SelectBuilderImpl(2) is trying to rendezvous calling + performAtomicTrySelect(TryPoll/TryOfferDesc). + 2. A linked pair of AtomicSelectOp(4) and TryPoll/OfferDesc(5) is created to initiate this operation. + 3. AtomicSelectOp.prepareSelectOp installs a reference to AtomicSelectOp(4) in SelectBuilderImpl(2).state(a) + field. STARTING AT THIS MOMENT CONCURRENT HELPERS CAN DISCOVER AND TRY TO HELP PERFORM THIS OPERATION. + 4. Then TryPoll/OfferDesc.prepare discovers "affectedNode" for this operation as Send/ReceiveSelect(3) and + creates PrepareOp(6) that references it. It installs reference to PrepareOp(6) in Send/ReceiveSelect(3).next(b) + instead of its original next pointer that was stored in PrepareOp(6).next. + 5. PrepareOp(6).perform calls TryPoll/OfferDesc(5).onPrepare which validates that PrepareOp(6).affected node + is of the correct type and tries to secure ability to resume it by calling affected.tryResumeSend/Receive. + Note, that different PrepareOp instances can be repeatedly created for different candidate nodes. If node is + found to be be resumed/selected, then REMOVE_PREPARED result causes Send/ReceiveSelect(3).next change to + undone and new PrepareOp is created with a different candidate node. Different concurrent helpers may end up + creating different PrepareOp instances, so it is important that they ultimately come to consensus about + node on which perform operation upon. + 6. Send/ReceiveSelect(3).affected.tryResumeSend/Receive forwards this call to SelectBuilderImpl.trySelectOther, + passing it a reference to PrepareOp(6) as an indication of the other select instance rendezvous. + 7. SelectBuilderImpl.trySelectOther creates PairSelectOp(7) and installs it as SelectBuilderImpl(1).state(c) + to secure the state of the first builder and commit ability to make it selected for this operation. + 8. NOW THE RENDEZVOUS IS FULLY PREPARED via descriptors installed at + - SelectBuilderImpl(2).state(a) + - Send/ReceiveSelect(3).next(b) + - SelectBuilderImpl(1).state(c) + Any concurrent operation that is trying to access any of the select instances or the queue is going to help. + Any helper that helps AtomicSelectOp(4) calls TryPoll/OfferDesc(5).prepare which tries to determine + "affectedNode" but is bound to discover the same Send/ReceiveSelect(3) node that cannot become + non-first node until this operation completes (there are no insertions to the head of the queue!) + We have not yet decided to complete this operation, but we cannot ever decide to complete this operation + on any other node but Send/ReceiveSelect(3), so it is now safe to perform the next step. + 9. PairSelectOp(7).perform calls PrepareOp(6).finishPrepare which copies PrepareOp(6).affected and PrepareOp(6).next + to the corresponding TryPoll/OfferDesc(5) fields. + 10. PairSelectOp(7).perform calls AtomicSelect(4).decide to reach consensus on successful completion of this + operation. This consensus is important in light of dead-lock resolution algorithm, because a stale helper + could have stumbled upon a higher-numbered atomic operation and had decided to abort this atomic operation, + reaching decision on RETRY_ATOMIC status of it. We cannot proceed with completion in this case and must abort, + all objects including AtomicSelectOp(4) will be dropped, reverting all the three updated pointers to + their original values and atomic operation will retry from scratch. + 11. NOW WITH SUCCESSFUL UPDATE OF AtomicSelectOp(4).consensus to null THE RENDEZVOUS IS COMMITTED. The rest + of the code proceeds to update: + - SelectBuilderImpl(1).state to TryPoll/OfferDesc(5) so that late helpers would know that we have + already successfully completed rendezvous. + - Send/ReceiveSelect(3).next to Removed(next) so that this node becomes marked as removed. + - SelectBuilderImpl(2).state to null to mark this select instance as selected. + + Note, that very late helper may try to perform this AtomicSelectOp(4) when it is already completed. + It can proceed as far as finding affected node, creating PrepareOp, installing this new PrepareOp into the + node's next pointer, but PrepareOp.perform checks that AtomicSelectOp(4) is already decided and undoes all + the preparations. + */ + + // it is just like plain trySelect, but support idempotent start + // Returns SELECT_STARTED | RETRY_ATOMIC | null (when already selected) + override fun trySelectOther(otherOp: PrepareOp?): Any? { + _state.loop { state -> // lock-free loop on state when { + // Found initial state (not selected yet) -- try to make it selected state === this -> { - if (_state.compareAndSet(this, idempotent)) { - doAfterSelect() - return true + if (otherOp == null) { + // regular trySelect -- just mark as select + if (!_state.compareAndSet(this, null)) return@loop + } else { + // Rendezvous with another select instance -- install PairSelectOp + val pairSelectOp = PairSelectOp(otherOp) + if (!_state.compareAndSet(this, pairSelectOp)) return@loop + val decision = pairSelectOp.perform(this) + if (decision !== null) return decision } + doAfterSelect() + return SELECT_STARTED + } + state is OpDescriptor -> { // state is either AtomicSelectOp or PairSelectOp + // Found descriptor of ongoing operation while working in the context of other select operation + if (otherOp != null) { + val otherAtomicOp = otherOp.atomicOp + when { + // It is the same select instance + otherAtomicOp is AtomicSelectOp && otherAtomicOp.impl === this -> { + /* + * We cannot do state.perform(this) here and "help" it since it is the same + * select and we'll get StackOverflowError. + * See https://github.com/Kotlin/kotlinx.coroutines/issues/1411 + * We cannot support this because select { ... } is an expression and its clauses + * have a result that shall be returned from the select. + */ + error("Cannot use matching select clauses on the same object") + } + // The other select (that is trying to proceed) had started earlier + otherAtomicOp.isEarlierThan(state) -> { + /** + * Abort to prevent deadlock by returning a failure to it. + * See https://github.com/Kotlin/kotlinx.coroutines/issues/504 + * The other select operation will receive a failure and will restart itself with a + * larger sequence number. This guarantees obstruction-freedom of this algorithm. + */ + return RETRY_ATOMIC + } + } + } + // Otherwise (not a special descriptor) + state.perform(this) // help it } // otherwise -- already selected - idempotent == null -> return false // already selected - state === idempotent -> return true // was selected with this marker - else -> return false + otherOp == null -> return null // already selected + state === otherOp.desc -> return SELECT_STARTED // was selected with this marker + else -> return null // selected with different marker } } } + // The very last step of rendezvous between two select operations + private class PairSelectOp( + @JvmField val otherOp: PrepareOp + ) : OpDescriptor() { + override fun perform(affected: Any?): Any? { + val impl = affected as SelectBuilderImpl<*> + // here we are definitely not going to RETRY_ATOMIC, so + // we must finish preparation of another operation before attempting to reach decision to select + otherOp.finishPrepare() + val decision = otherOp.atomicOp.decide(null) // try decide for success of operation + val update: Any = if (decision == null) otherOp.desc else impl + impl._state.compareAndSet(this, update) + return decision + } + + override val atomicOp: AtomicOp<*>? + get() = otherOp.atomicOp + } + override fun performAtomicTrySelect(desc: AtomicDesc): Any? = - AtomicSelectOp(desc).perform(null) + AtomicSelectOp(this, desc).perform(null) + + override fun toString(): String { + val state = _state.value + return "SelectInstance(state=${if (state === this) "this" else state.toString()}, result=${_result.value})" + } - private inner class AtomicSelectOp( + private class AtomicSelectOp( + @JvmField val impl: SelectBuilderImpl<*>, @JvmField val desc: AtomicDesc ) : AtomicOp() { + // all select operations are totally ordered by their creating time using selectOpSequenceNumber + override val opSequence = selectOpSequenceNumber.next() + + init { + desc.atomicOp = this + } + override fun prepare(affected: Any?): Any? { // only originator of operation makes preparation move of installing descriptor into this selector's state // helpers should never do it, or risk ruining progress when they come late if (affected == null) { // we are originator (affected reference is not null if helping) - prepareIfNotSelected()?.let { return it } + prepareSelectOp()?.let { return it } + } + try { + return desc.prepare(this) + } catch (e: Throwable) { + // undo prepareSelectedOp on crash (for example if IllegalStateException is thrown) + if (affected == null) undoPrepare() + throw e } - return desc.prepare(this) } override fun complete(affected: Any?, failure: Any?) { @@ -383,13 +575,13 @@ internal class SelectBuilderImpl( desc.complete(this, failure) } - fun prepareIfNotSelected(): Any? { - _state.loop { state -> + private fun prepareSelectOp(): Any? { + impl._state.loop { state -> when { - state === this@AtomicSelectOp -> return null // already in progress - state is OpDescriptor -> state.perform(this@SelectBuilderImpl) // help - state === this@SelectBuilderImpl -> { - if (_state.compareAndSet(this@SelectBuilderImpl, this@AtomicSelectOp)) + state === this -> return null // already in progress + state is OpDescriptor -> state.perform(impl) // help + state === impl -> { + if (impl._state.compareAndSet(impl, this)) return null // success } else -> return ALREADY_SELECTED @@ -397,14 +589,21 @@ internal class SelectBuilderImpl( } } + // reverts the change done by prepareSelectedOp + private fun undoPrepare() { + impl._state.compareAndSet(this, impl) + } + private fun completeSelect(failure: Any?) { val selectSuccess = failure == null - val update = if (selectSuccess) null else this@SelectBuilderImpl - if (_state.compareAndSet(this@AtomicSelectOp, update)) { + val update = if (selectSuccess) null else impl + if (impl._state.compareAndSet(this, update)) { if (selectSuccess) - doAfterSelect() + impl.doAfterSelect() } } + + override fun toString(): String = "AtomicSelectOp(sequence=$opSequence)" } override fun SelectClause0.invoke(block: suspend () -> R) { @@ -421,14 +620,14 @@ internal class SelectBuilderImpl( override fun onTimeout(timeMillis: Long, block: suspend () -> R) { if (timeMillis <= 0L) { - if (trySelect(null)) + if (trySelect()) block.startCoroutineUnintercepted(completion) return } val action = Runnable { // todo: we could have replaced startCoroutine with startCoroutineUndispatched // But we need a way to know that Delay.invokeOnTimeout had used the right thread - if (trySelect(null)) + if (trySelect()) block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch } disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action)) diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 3c72915379..f82d6ca8ff 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -240,6 +240,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { } failure === ALREADY_SELECTED -> return // already selected -- bail out failure === LOCK_FAIL -> {} // retry + failure === RETRY_ATOMIC -> {} // retry else -> error("performAtomicTrySelect(TryLockDesc) returned $failure") } } @@ -264,9 +265,9 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { @JvmField val owner: Any? ) : AtomicDesc() { // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation - private inner class PrepareOp(private val op: AtomicOp<*>) : OpDescriptor() { + private inner class PrepareOp(override val atomicOp: AtomicOp<*>) : OpDescriptor() { override fun perform(affected: Any?): Any? { - val update: Any = if (op.isDecided) EMPTY_UNLOCKED else op // restore if was already decided + val update: Any = if (atomicOp.isDecided) EMPTY_UNLOCKED else atomicOp // restore if was already decided (affected as MutexImpl)._state.compareAndSet(this, update) return null // ok } @@ -367,7 +368,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { @JvmField val select: SelectInstance, @JvmField val block: suspend (Mutex) -> R ) : LockWaiter(owner) { - override fun tryResumeLockWaiter(): Any? = if (select.trySelect(null)) SELECT_SUCCESS else null + override fun tryResumeLockWaiter(): Any? = if (select.trySelect()) SELECT_SUCCESS else null override fun completeResumeLockWaiter(token: Any) { assert { token === SELECT_SUCCESS } block.startCoroutine(receiver = mutex, completion = select.completion) @@ -379,6 +380,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { private class UnlockOp( @JvmField val queue: LockedQueue ) : OpDescriptor() { + override val atomicOp: AtomicOp<*>? get() = null + override fun perform(affected: Any?): Any? { /* Note: queue cannot change while this UnlockOp is in progress, so all concurrent attempts to diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt index ece95db19a..c9747c6fe8 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.selects @@ -385,7 +385,7 @@ class SelectArrayChannelTest : TestBase() { // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion - if (!trySelect(null)) return + if (!trySelect()) return block.startCoroutineUnintercepted(this) } } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectBuilderImplTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectBuilderImplTest.kt deleted file mode 100644 index d231135992..0000000000 --- a/kotlinx-coroutines-core/common/test/selects/SelectBuilderImplTest.kt +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.selects - -import kotlinx.coroutines.* -import kotlin.coroutines.* -import kotlin.coroutines.intrinsics.* -import kotlin.test.* - -class SelectBuilderImplTest : TestBase() { - @Test - fun testIdempotentSelectResumeInline() { - var resumed = false - val delegate = object : Continuation { - override val context: CoroutineContext get() = EmptyCoroutineContext - override fun resumeWith(result: Result) { - check(result.getOrNull() == "OK") - resumed = true - } - } - val c = SelectBuilderImpl(delegate) - // still running builder - check(!c.isSelected) - check(c.trySelect("SELECT")) - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - c.completion.resume("OK") - check(!resumed) // still running builder, didn't invoke delegate - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - check(c.getResult() === "OK") // then builder returns - } - - @Test - fun testIdempotentSelectResumeSuspended() { - var resumed = false - val delegate = object : Continuation { - override val context: CoroutineContext get() = EmptyCoroutineContext - override fun resumeWith(result: Result) { - check(result.getOrNull() == "OK") - resumed = true - } - } - val c = SelectBuilderImpl(delegate) - check(c.getResult() === COROUTINE_SUSPENDED) // suspend first - check(!c.isSelected) - check(c.trySelect("SELECT")) - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - check(!resumed) - c.completion.resume("OK") - check(resumed) - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - } - - @Test - fun testIdempotentSelectResumeWithExceptionInline() { - var resumed = false - val delegate = object : Continuation { - override val context: CoroutineContext get() = EmptyCoroutineContext - override fun resumeWith(result: Result) { - check(result.exceptionOrNull() is TestException) - resumed = true - } - } - val c = SelectBuilderImpl(delegate) - // still running builder - check(!c.isSelected) - check(c.trySelect("SELECT")) - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - c.completion.resumeWithException(TestException()) - check(!resumed) // still running builder, didn't invoke delegate - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - try { - c.getResult() // the builder should throw exception - error("Failed") - } catch (e: Throwable) { - check(e is TestException) - } - } - - @Test - fun testIdempotentSelectResumeWithExceptionSuspended() { - var resumed = false - val delegate = object : Continuation { - override val context: CoroutineContext get() = EmptyCoroutineContext - override fun resumeWith(result: Result) { - check(result.exceptionOrNull() is TestException) - resumed = true - } - } - val c = SelectBuilderImpl(delegate) - check(c.getResult() === COROUTINE_SUSPENDED) // suspend first - check(!c.isSelected) - check(c.trySelect("SELECT")) - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - check(!resumed) - c.completion.resumeWithException(TestException()) - check(resumed) - check(c.isSelected) - check(!c.trySelect("OTHER")) - check(c.trySelect("SELECT")) - } -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt index 6072cc2cbb..e84514ea5b 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 @@ -429,7 +429,41 @@ class SelectRendezvousChannelTest : TestBase() { // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion - if (!trySelect(null)) return + if (!trySelect()) return block.startCoroutineUnintercepted(this) } + + @Test + fun testSelectSendAndReceive() = runTest { + val c = Channel() + assertFailsWith { + select { + c.onSend(1) { expectUnreached() } + c.onReceive { expectUnreached() } + } + } + checkNotBroken(c) + } + + @Test + fun testSelectReceiveAndSend() = runTest { + val c = Channel() + assertFailsWith { + select { + c.onReceive { expectUnreached() } + c.onSend(1) { expectUnreached() } + } + } + checkNotBroken(c) + } + + // makes sure the channel is not broken + private suspend fun checkNotBroken(c: Channel) { + coroutineScope { + launch { + c.send(42) + } + assertEquals(42, c.receive()) + } + } } diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt index 6050901058..7adc7a7865 100644 --- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt @@ -96,42 +96,41 @@ public actual open class AddLastDesc actual constructor( actual val queue: Node, actual val node: T ) : AbstractAtomicDesc() { - protected override val affectedNode: Node get() = queue._prev - protected actual override fun onPrepare(affected: Node, next: Node): Any? = null - protected override fun onComplete() = queue.addLast(node) - protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit + override val affectedNode: Node get() = queue._prev + actual override fun finishPrepare(prepareOp: PrepareOp) {} + override fun onComplete() = queue.addLast(node) + actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } /** @suppress **This is unstable API and it is subject to change.** */ public actual open class RemoveFirstDesc actual constructor( actual val queue: LockFreeLinkedListNode ) : AbstractAtomicDesc() { - @Suppress("UNCHECKED_CAST") - public actual val result: T get() = affectedNode as T - protected override val affectedNode: Node = queue.nextNode - protected actual open fun validatePrepared(node: T): Boolean = true - protected actual final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? { - @Suppress("UNCHECKED_CAST") - validatePrepared(affectedNode as T) - return null - } - protected override fun onComplete() { queue.removeFirstOrNull() } - protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit + actual val result: T get() = affectedNode as T + override val affectedNode: Node = queue.nextNode + actual override fun finishPrepare(prepareOp: PrepareOp) {} + override fun onComplete() { queue.removeFirstOrNull() } + actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } /** @suppress **This is unstable API and it is subject to change.** */ public actual abstract class AbstractAtomicDesc : AtomicDesc() { protected abstract val affectedNode: Node - protected actual abstract fun onPrepare(affected: Node, next: Node): Any? + actual abstract fun finishPrepare(prepareOp: PrepareOp) protected abstract fun onComplete() + actual open fun onPrepare(prepareOp: PrepareOp): Any? { + finishPrepare(prepareOp) + return null + } + actual final override fun prepare(op: AtomicOp<*>): Any? { val affected = affectedNode - val next = affected._next val failure = failure(affected) if (failure != null) return failure - return onPrepare(affected, next) + @Suppress("UNCHECKED_CAST") + return onPrepare(PrepareOp(affected, this, op)) } actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() @@ -140,6 +139,16 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() { protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } +/** @suppress **This is unstable API and it is subject to change.** */ +public actual class PrepareOp( + actual val affected: LockFreeLinkedListNode, + actual val desc: AbstractAtomicDesc, + actual override val atomicOp: AtomicOp<*> +): OpDescriptor() { + override fun perform(affected: Any?): Any? = null + actual fun finishPrepare() {} +} + /** @suppress **This is unstable API and it is subject to change.** */ public open class LinkedListHead : LinkedListNode() { public val isEmpty get() = _next === this diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt index d3d168a427..9da237dc5f 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt @@ -21,14 +21,9 @@ internal const val FAILURE = 2 @PublishedApi internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE") -@PublishedApi -internal val ALREADY_REMOVED: Any = Symbol("ALREADY_REMOVED") - @PublishedApi internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY") -private val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED") - /** @suppress **This is unstable API and it is subject to change.** */ public actual typealias RemoveFirstDesc = LockFreeLinkedListNode.RemoveFirstDesc @@ -38,6 +33,9 @@ public actual typealias AddLastDesc = LockFreeLinkedListNode.AddLastDesc /** @suppress **This is unstable API and it is subject to change.** */ public actual typealias AbstractAtomicDesc = LockFreeLinkedListNode.AbstractAtomicDesc +/** @suppress **This is unstable API and it is subject to change.** */ +public actual typealias PrepareOp = LockFreeLinkedListNode.PrepareOp + /** * Doubly-linked concurrent list node with remove support. * Based on paper @@ -298,13 +296,16 @@ public actual open class LockFreeLinkedListNode { assert { node._next.value === node && node._prev.value === node } } - final override fun takeAffectedNode(op: OpDescriptor): Node { + // Returns null when atomic op got into deadlock trying to help operation that started later + final override fun takeAffectedNode(op: OpDescriptor): Node? { while (true) { val prev = queue._prev.value as Node // this sentinel node is never removed val next = prev._next.value if (next === queue) return prev // all is good -> linked properly if (next === op) return prev // all is good -> our operation descriptor is already there if (next is OpDescriptor) { // some other operation descriptor -> help & retry + if (op.isEarlierThan(next)) + return null // RETRY_ATOMIC next.perform(prev) continue } @@ -321,12 +322,11 @@ public actual open class LockFreeLinkedListNode { override fun retry(affected: Node, next: Any): Boolean = next !== queue - protected override fun onPrepare(affected: Node, next: Node): Any? { + override fun finishPrepare(prepareOp: PrepareOp) { // Note: onPrepare must use CAS to make sure the stale invocation is not // going to overwrite the previous decision on successful preparation. // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes - _affectedNode.compareAndSet(null, affected) - return null // always success + _affectedNode.compareAndSet(null, prepareOp.affected) } override fun updatedNext(affected: Node, next: Node): Any { @@ -351,7 +351,18 @@ public actual open class LockFreeLinkedListNode { @Suppress("UNCHECKED_CAST") public val result: T get() = affectedNode!! as T - final override fun takeAffectedNode(op: OpDescriptor): Node = queue.next as Node + final override fun takeAffectedNode(op: OpDescriptor): Node? { + queue._next.loop { next -> + if (next is OpDescriptor) { + if (op.isEarlierThan(next)) + return null // RETRY_ATOMIC + next.perform(queue) + } else { + return next as Node + } + } + } + final override val affectedNode: Node? get() = _affectedNode.value final override val originalNext: Node? get() = _originalNext.value @@ -359,83 +370,94 @@ public actual open class LockFreeLinkedListNode { protected override fun failure(affected: Node): Any? = if (affected === queue) LIST_EMPTY else null - // validate the resulting node (return false if it should be deleted) - protected open fun validatePrepared(node: T): Boolean = true // false means remove node & retry - final override fun retry(affected: Node, next: Any): Boolean { if (next !is Removed) return false affected.helpDelete() // must help delete, or loose lock-freedom return true } - @Suppress("UNCHECKED_CAST") - final override fun onPrepare(affected: Node, next: Node): Any? { - assert { affected !is LockFreeLinkedListHead } - if (!validatePrepared(affected as T)) return REMOVE_PREPARED - // Note: onPrepare must use CAS to make sure the stale invocation is not + override fun finishPrepare(prepareOp: PrepareOp) { + // Note: finishPrepare must use CAS to make sure the stale invocation is not // going to overwrite the previous decision on successful preparation. // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes - _affectedNode.compareAndSet(null, affected) - _originalNext.compareAndSet(null, next) - return null // ok + _affectedNode.compareAndSet(null, prepareOp.affected) + _originalNext.compareAndSet(null, prepareOp.next) } final override fun updatedNext(affected: Node, next: Node): Any = next.removed() final override fun finishOnSuccess(affected: Node, next: Node) = affected.finishRemove(next) } + // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation + // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) + public class PrepareOp( + @JvmField val affected: Node, + @JvmField val next: Node, + @JvmField val desc: AbstractAtomicDesc + ) : OpDescriptor() { + override val atomicOp: AtomicOp<*> get() = desc.atomicOp + + // Returns REMOVE_PREPARED or null (it makes decision on any failure) + override fun perform(affected: Any?): Any? { + assert(affected === this.affected) + affected as Node // type assertion + val decision = desc.onPrepare(this) + if (decision === REMOVE_PREPARED) { + // remove element on failure -- do not mark as decided, will try another one + val removed = next.removed() + if (affected._next.compareAndSet(this, removed)) { + affected.helpDelete() + } + return REMOVE_PREPARED + } + val isDecided = if (decision != null) { + // some other logic failure, including RETRY_ATOMIC -- reach consensus on decision fail reason ASAP + atomicOp.decide(decision) + true // atomicOp.isDecided will be true as a result + } else { + atomicOp.isDecided // consult with current decision status like in Harris DCSS + } + val update: Any = if (isDecided) next else atomicOp // restore if decision was already reached + affected._next.compareAndSet(this, update) + return null + } + + public fun finishPrepare() = desc.finishPrepare(this) + + override fun toString(): String = "PrepareOp(op=$atomicOp)" + } + public abstract class AbstractAtomicDesc : AtomicDesc() { protected abstract val affectedNode: Node? protected abstract val originalNext: Node? - protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!! + protected open fun takeAffectedNode(op: OpDescriptor): Node? = affectedNode!! // null for RETRY_ATOMIC protected open fun failure(affected: Node): Any? = null // next: Node | Removed protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed - protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure protected abstract fun updatedNext(affected: Node, next: Node): Any protected abstract fun finishOnSuccess(affected: Node, next: Node) - // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation - // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) - private class PrepareOp( - @JvmField val next: Node, - @JvmField val op: AtomicOp, - @JvmField val desc: AbstractAtomicDesc - ) : OpDescriptor() { - override fun perform(affected: Any?): Any? { - affected as Node // type assertion - val decision = desc.onPrepare(affected, next) - if (decision != null) { - if (decision === REMOVE_PREPARED) { - // remove element on failure - val removed = next.removed() - if (affected._next.compareAndSet(this, removed)) { - affected.helpDelete() - } - } else { - // some other failure -- mark as decided - op.tryDecide(decision) - // undo preparations - affected._next.compareAndSet(this, next) - } - return decision - } - val update: Any = if (op.isDecided) next else op // restore if decision was already reached - affected._next.compareAndSet(this, update) - return null // ok - } + public abstract fun finishPrepare(prepareOp: PrepareOp) + + // non-null on failure + public open fun onPrepare(prepareOp: PrepareOp): Any? { + finishPrepare(prepareOp) + return null } @Suppress("UNCHECKED_CAST") final override fun prepare(op: AtomicOp<*>): Any? { while (true) { // lock free loop on next - val affected = takeAffectedNode(op) + val affected = takeAffectedNode(op) ?: return RETRY_ATOMIC // read its original next pointer first val next = affected._next.value // then see if already reached consensus on overall operation if (next === op) return null // already in process of operation -- all is good if (op.isDecided) return null // already decided this operation -- go to next desc if (next is OpDescriptor) { - // some other operation is in process -- help it + // some other operation is in process + // if operation in progress (preparing or prepared) has higher sequence number -- abort our preparations + if (op.isEarlierThan(next)) + return RETRY_ATOMIC next.perform(affected) continue // and retry } @@ -443,12 +465,19 @@ public actual open class LockFreeLinkedListNode { val failure = failure(affected) if (failure != null) return failure // signal failure if (retry(affected, next)) continue // retry operation - val prepareOp = PrepareOp(next as Node, op as AtomicOp, this) + val prepareOp = PrepareOp(affected, next as Node, this) if (affected._next.compareAndSet(next, prepareOp)) { // prepared -- complete preparations - val prepFail = prepareOp.perform(affected) - if (prepFail === REMOVE_PREPARED) continue // retry - return prepFail + try { + val prepFail = prepareOp.perform(affected) + if (prepFail === REMOVE_PREPARED) continue // retry + assert { prepFail == null } + return null + } catch (e: Throwable) { + // Crashed during preparation (for example IllegalStateExpception) -- undo & rethrow + affected._next.compareAndSet(prepareOp, next) + throw e + } } } } diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt index af2de24e1a..20c2b5308c 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAtomicLFStressTest.kt @@ -122,6 +122,10 @@ class LockFreeLinkedListAtomicLFStressTest : TestBase() { val add1 = list1.describeAddLast(node1) val add2 = list2.describeAddLast(node2) val op = object : AtomicOp() { + init { + add1.atomicOp = this + add2.atomicOp = this + } override fun prepare(affected: Any?): Any? = add1.prepare(this) ?: add2.prepare(this) @@ -145,6 +149,10 @@ class LockFreeLinkedListAtomicLFStressTest : TestBase() { val remove1 = list1.describeRemoveFirst() val remove2 = list2.describeRemoveFirst() val op = object : AtomicOp() { + init { + remove1.atomicOp = this + remove2.atomicOp = this + } override fun prepare(affected: Any?): Any? = remove1.prepare(this) ?: remove2.prepare(this) diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt index 9238681e8b..9de11f792e 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt @@ -63,6 +63,9 @@ class LockFreeLinkedListTest { private fun single(part: AtomicDesc) { val operation = object : AtomicOp() { + init { + part.atomicOp = this + } override fun prepare(affected: Any?): Any? = part.prepare(this) override fun complete(affected: Any?, failure: Any?) = part.complete(this, failure) } diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectChannelStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectChannelStressTest.kt index 380ec5e84d..200cdc09b0 100644 --- a/kotlinx-coroutines-core/jvm/test/selects/SelectChannelStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/selects/SelectChannelStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.selects @@ -70,7 +70,7 @@ class SelectChannelStressTest: TestBase() { internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion - if (!trySelect(null)) return + if (!trySelect()) return block.startCoroutineUnintercepted(this) } } diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt new file mode 100644 index 0000000000..4497bec5b9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockLFStressTest.kt @@ -0,0 +1,101 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.selects + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.junit.* +import org.junit.Ignore +import org.junit.Test +import kotlin.math.* +import kotlin.test.* + +/** + * A stress-test on lock-freedom of select sending/receiving into opposite channels. + */ +class SelectDeadlockLFStressTest : TestBase() { + private val env = LockFreedomTestEnvironment("SelectDeadlockLFStressTest", allowSuspendedThreads = 1) + private val nSeconds = 5 * stressTestMultiplier + + private val c1 = Channel() + private val c2 = Channel() + + @Test + fun testLockFreedom() = testScenarios( + "s1r2", + "s2r1", + "r1s2", + "r2s1" + ) + + private fun testScenarios(vararg scenarios: String) { + env.onCompletion { + c1.cancel(TestCompleted()) + c2.cancel(TestCompleted()) + } + val t = scenarios.mapIndexed { i, scenario -> + val idx = i + 1L + TestDef(idx, "$idx [$scenario]", scenario) + } + t.forEach { it.test() } + env.performTest(nSeconds) { + t.forEach { println(it) } + } + } + + private inner class TestDef( + var sendIndex: Long = 0L, + val name: String, + scenario: String + ) { + var receiveIndex = 0L + + val clauses: List.() -> Unit> = ArrayList.() -> Unit>().apply { + require(scenario.length % 2 == 0) + for (i in scenario.indices step 2) { + val ch = when (val c = scenario[i + 1]) { + '1' -> c1 + '2' -> c2 + else -> error("Channel '$c'") + } + val clause = when (val op = scenario[i]) { + 's' -> fun SelectBuilder.() { sendClause(ch) } + 'r' -> fun SelectBuilder.() { receiveClause(ch) } + else -> error("Operation '$op'") + } + add(clause) + } + } + + fun test() = env.testThread(name) { + doSendReceive() + } + + private suspend fun doSendReceive() { + try { + select { + for (clause in clauses) clause() + } + } catch (e: TestCompleted) { + assertTrue(env.isCompleted) + } + } + + private fun SelectBuilder.sendClause(c: Channel) = + c.onSend(sendIndex) { + sendIndex += 4L + } + + private fun SelectBuilder.receiveClause(c: Channel) = + c.onReceive { i -> + receiveIndex = max(i, receiveIndex) + } + + override fun toString(): String = "$name: send=$sendIndex, received=$receiveIndex" + } + + private class TestCompleted : CancellationException() +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockStressTest.kt new file mode 100644 index 0000000000..d8d4b228c4 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/selects/SelectDeadlockStressTest.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.selects + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +/** + * A simple stress-test that does select sending/receiving into opposite channels to ensure that they + * don't deadlock. See https://github.com/Kotlin/kotlinx.coroutines/issues/504 + */ +class SelectDeadlockStressTest : TestBase() { + private val pool = newFixedThreadPoolContext(2, "SelectDeadlockStressTest") + private val nSeconds = 3 * stressTestMultiplier + + @After + fun tearDown() { + pool.close() + } + + @Test + fun testStress() = runTest { + val c1 = Channel() + val c2 = Channel() + val s1 = Stats() + val s2 = Stats() + launchSendReceive(c1, c2, s1) + launchSendReceive(c2, c1, s2) + for (i in 1..nSeconds) { + delay(1000) + println("$i: First: $s1; Second: $s2") + } + coroutineContext.cancelChildren() + } + + private class Stats { + var sendIndex = 0L + var receiveIndex = 0L + + override fun toString(): String = "send=$sendIndex, received=$receiveIndex" + } + + private fun CoroutineScope.launchSendReceive(c1: Channel, c2: Channel, s: Stats) = launch(pool) { + while (true) { + if (s.sendIndex % 1000 == 0L) yield() + select { + c1.onSend(s.sendIndex) { + s.sendIndex++ + } + c2.onReceive { i -> + assertEquals(s.receiveIndex, i) + s.receiveIndex++ + } + } + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/internal/LinkedList.kt b/kotlinx-coroutines-core/native/src/internal/LinkedList.kt index 07fe1a06c7..bcdd0e8377 100644 --- a/kotlinx-coroutines-core/native/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/native/src/internal/LinkedList.kt @@ -94,42 +94,41 @@ public actual open class AddLastDesc actual constructor( actual val queue: Node, actual val node: T ) : AbstractAtomicDesc() { - protected override val affectedNode: Node get() = queue._prev - protected actual override fun onPrepare(affected: Node, next: Node): Any? = null - protected override fun onComplete() = queue.addLast(node) - protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit + override val affectedNode: Node get() = queue._prev + actual override fun finishPrepare(prepareOp: PrepareOp) {} + override fun onComplete() = queue.addLast(node) + actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } /** @suppress **This is unstable API and it is subject to change.** */ public actual open class RemoveFirstDesc actual constructor( actual val queue: LockFreeLinkedListNode ) : AbstractAtomicDesc() { - @Suppress("UNCHECKED_CAST") - public actual val result: T get() = affectedNode as T - protected override val affectedNode: Node = queue.nextNode - protected actual open fun validatePrepared(node: T): Boolean = true - protected actual final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? { - @Suppress("UNCHECKED_CAST") - validatePrepared(affectedNode as T) - return null - } - protected override fun onComplete() { queue.removeFirstOrNull() } - protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit + actual val result: T get() = affectedNode as T + override val affectedNode: Node = queue.nextNode + actual override fun finishPrepare(prepareOp: PrepareOp) {} + override fun onComplete() { queue.removeFirstOrNull() } + actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } /** @suppress **This is unstable API and it is subject to change.** */ public actual abstract class AbstractAtomicDesc : AtomicDesc() { protected abstract val affectedNode: Node - protected actual abstract fun onPrepare(affected: Node, next: Node): Any? + actual abstract fun finishPrepare(prepareOp: PrepareOp) protected abstract fun onComplete() + actual open fun onPrepare(prepareOp: PrepareOp): Any? { + finishPrepare(prepareOp) + return null + } + actual final override fun prepare(op: AtomicOp<*>): Any? { val affected = affectedNode - val next = affected._next val failure = failure(affected) if (failure != null) return failure - return onPrepare(affected, next) + @Suppress("UNCHECKED_CAST") + return onPrepare(PrepareOp(affected, this, op)) } actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() @@ -138,6 +137,16 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() { protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } +/** @suppress **This is unstable API and it is subject to change.** */ +public actual class PrepareOp( + actual val affected: LockFreeLinkedListNode, + actual val desc: AbstractAtomicDesc, + actual override val atomicOp: AtomicOp<*> +): OpDescriptor() { + override fun perform(affected: Any?): Any? = null + actual fun finishPrepare() {} +} + /** @suppress **This is unstable API and it is subject to change.** */ public open class LinkedListHead : LinkedListNode() { public val isEmpty get() = _next === this