diff --git a/kotlinx-coroutines-core/common/src/internal/Atomic.kt b/kotlinx-coroutines-core/common/src/internal/Atomic.kt index 127b70f7e8..ff4320e0b3 100644 --- a/kotlinx-coroutines-core/common/src/internal/Atomic.kt +++ b/kotlinx-coroutines-core/common/src/internal/Atomic.kt @@ -29,12 +29,6 @@ public abstract class OpDescriptor { abstract val atomicOp: AtomicOp<*>? override fun toString(): String = "$classSimpleName@$hexAddress" // debug - - fun isEarlierThan(that: OpDescriptor): Boolean { - val thisOp = atomicOp ?: return false - val thatOp = that.atomicOp ?: return false - return thisOp.opSequence < thatOp.opSequence - } } @JvmField @@ -55,25 +49,9 @@ internal val NO_DECISION: Any = Symbol("NO_DECISION") public abstract class AtomicOp : OpDescriptor() { private val _consensus = atomic(NO_DECISION) - // Returns NO_DECISION when there is not decision yet - val consensus: Any? get() = _consensus.value - - val isDecided: Boolean get() = _consensus.value !== NO_DECISION - - /** - * Sequence number of this multi-word operation for deadlock resolution. - * An operation with lower number aborts itself with (using [RETRY_ATOMIC] error symbol) if it encounters - * the need to help the operation with higher sequence number and then restarts - * (using higher `opSequence` to ensure progress). - * Simple operations that cannot get into the deadlock always return zero here. - * - * See https://github.com/Kotlin/kotlinx.coroutines/issues/504 - */ - open val opSequence: Long get() = 0L - override val atomicOp: AtomicOp<*> get() = this - fun decide(decision: Any?): Any? { + private fun decide(decision: Any?): Any? { assert { decision !== NO_DECISION } val current = _consensus.value if (current !== NO_DECISION) return current @@ -98,21 +76,3 @@ public abstract class AtomicOp : OpDescriptor() { return decision } } - -/** - * A part of multi-step atomic operation [AtomicOp]. - * - * @suppress **This is unstable API and it is subject to change.** - */ -public abstract class AtomicDesc { - lateinit var atomicOp: AtomicOp<*> // the reference to parent atomicOp, init when AtomicOp is created - abstract fun prepare(op: AtomicOp<*>): Any? // returns `null` if prepared successfully - abstract fun complete(op: AtomicOp<*>, failure: Any?) // decision == null if success -} - -/** - * It is returned as an error by [AtomicOp] implementations when they detect potential deadlock - * using [AtomicOp.opSequence] numbers. - */ -@JvmField -internal val RETRY_ATOMIC: Any = Symbol("RETRY_ATOMIC") diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 26fef8c012..121cdedc9c 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -16,27 +16,8 @@ public expect open class LockFreeLinkedListNode() { public fun addLast(node: LockFreeLinkedListNode) public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean - public inline fun addLastIfPrev( - node: LockFreeLinkedListNode, - predicate: (LockFreeLinkedListNode) -> Boolean - ): Boolean - - public inline fun addLastIfPrevAndIf( - node: LockFreeLinkedListNode, - predicate: (LockFreeLinkedListNode) -> Boolean, // prev node predicate - crossinline condition: () -> Boolean // atomically checked condition - ): Boolean - public open fun remove(): Boolean - /** - * Helps fully finish [remove] operation, must be invoked after [remove] if needed. - * Ensures that traversing the list via prev pointers sees this node as removed. - * No-op on JS - */ - public fun helpRemove() - public fun removeFirstOrNull(): LockFreeLinkedListNode? - public inline fun removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? } /** @suppress **This is unstable API and it is subject to change.** */ @@ -45,45 +26,3 @@ public expect open class LockFreeLinkedListHead() : LockFreeLinkedListNode { public inline fun forEach(block: (T) -> Unit) public final override fun remove(): Nothing } - -/** @suppress **This is unstable API and it is subject to change.** */ -public expect open class AddLastDesc( - queue: LockFreeLinkedListNode, - node: T -) : AbstractAtomicDesc { - val queue: LockFreeLinkedListNode - val node: T - override fun finishPrepare(prepareOp: PrepareOp) - override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public expect open class RemoveFirstDesc(queue: LockFreeLinkedListNode): AbstractAtomicDesc { - val queue: LockFreeLinkedListNode - public val result: T - override fun finishPrepare(prepareOp: PrepareOp) - final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public expect abstract class AbstractAtomicDesc : AtomicDesc { - final override fun prepare(op: AtomicOp<*>): Any? - final override fun complete(op: AtomicOp<*>, failure: Any?) - protected open fun failure(affected: LockFreeLinkedListNode): Any? - protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean - public abstract fun finishPrepare(prepareOp: PrepareOp) // non-null on failure - public open fun onPrepare(prepareOp: PrepareOp): Any? // non-null on failure - public open fun onRemoved(affected: LockFreeLinkedListNode) // non-null on failure - protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public expect class PrepareOp: OpDescriptor { - val affected: LockFreeLinkedListNode - override val atomicOp: AtomicOp<*> - val desc: AbstractAtomicDesc - fun finishPrepare() -} - -@JvmField -internal val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED") diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt index f02bd09a66..0faa79b3ae 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt @@ -166,7 +166,6 @@ class ChannelUndeliveredElementFailureTest : TestBase() { fun testSendDropOldestInvokeHandlerConflated() = runTest(expected = { it is UndeliveredElementException }) { val channel = Channel(Channel.CONFLATED, onUndeliveredElement = { finish(2) - println(TestException().stackTraceToString()) throw TestException() }) channel.send(42) diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index c9a1b61261..00888499c6 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -23,21 +23,6 @@ internal const val FAILURE: Int = 2 @PublishedApi internal val CONDITION_FALSE: Any = Symbol("CONDITION_FALSE") -@PublishedApi -internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY") - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual typealias RemoveFirstDesc = LockFreeLinkedListNode.RemoveFirstDesc - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual typealias AddLastDesc = LockFreeLinkedListNode.AddLastDesc - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual typealias AbstractAtomicDesc = LockFreeLinkedListNode.AbstractAtomicDesc - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual typealias PrepareOp = LockFreeLinkedListNode.PrepareOp - /** * Doubly-linked concurrent list node with remove support. * Based on paper @@ -142,8 +127,6 @@ public actual open class LockFreeLinkedListNode { } } - public fun describeAddLast(node: T): AddLastDesc = AddLastDesc(this, node) - /** * Adds last item to this list atomically if the [condition] is true. */ @@ -158,30 +141,6 @@ public actual open class LockFreeLinkedListNode { } } - public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { - while (true) { // lock-free loop on prev.next - val prev = prevNode // sentinel node is never removed, so prev is always defined - if (!predicate(prev)) return false - if (prev.addNext(node, this)) return true - } - } - - public actual inline fun addLastIfPrevAndIf( - node: Node, - predicate: (Node) -> Boolean, // prev node predicate - crossinline condition: () -> Boolean // atomically checked condition - ): Boolean { - val condAdd = makeCondAddOp(node, condition) - while (true) { // lock-free loop on prev.next - val prev = prevNode // sentinel node is never removed, so prev is always defined - if (!predicate(prev)) return false - when (prev.tryCondAddNext(node, this, condAdd)) { - SUCCESS -> return true - FAILURE -> return false - } - } - } - // ------ addXXX util ------ /** @@ -236,7 +195,6 @@ public actual open class LockFreeLinkedListNode { * * **Note**: Invocation of this operation does not guarantee that remove was actually complete if result was `false`. * In particular, invoking [nextNode].[prevNode] might still return this node even though it is "already removed". - * Invoke [helpRemove] to make sure that remove was completed. */ public actual open fun remove(): Boolean = removeOrNext() == null @@ -257,263 +215,9 @@ public actual open class LockFreeLinkedListNode { } } - // Helps with removal of this node - public actual fun helpRemove() { - // Note: this node must be already removed - (next as Removed).ref.helpRemovePrev() - } - - // Helps with removal of nodes that are previous to this - @PublishedApi - internal fun helpRemovePrev() { - // We need to call correctPrev on a non-removed node to ensure progress, since correctPrev bails out when - // called on a removed node. There's always at least one non-removed node (list head). - var node = this - while (true) { - val next = node.next - if (next !is Removed) break - node = next.ref - } - // Found a non-removed node - node.correctPrev(null) - } - - public actual fun removeFirstOrNull(): Node? { - while (true) { // try to linearize - val first = next as Node - if (first === this) return null - if (first.remove()) return first - first.helpRemove() // must help remove to ensure lock-freedom - } - } - - public fun describeRemoveFirst(): RemoveFirstDesc = RemoveFirstDesc(this) - - // just peek at item when predicate is true - public actual inline fun removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? { - while (true) { - val first = this.next as Node - if (first === this) return null // got list head -- nothing to remove - if (first !is T) return null - if (predicate(first)) { - // check for removal of the current node to make sure "peek" operation is linearizable - if (!first.isRemoved) return first - } - val next = first.removeOrNext() - if (next === null) return first // removed successfully -- return it - // help and start from the beginning - next.helpRemovePrev() - } - } - - // ------ multi-word atomic operations helpers ------ - - public open class AddLastDesc constructor( - @JvmField val queue: Node, - @JvmField val node: T - ) : AbstractAtomicDesc() { - init { - // require freshly allocated node here - assert { node._next.value === node && node._prev.value === node } - } - - // Returns null when atomic op got into deadlock trying to help operation that started later (RETRY_ATOMIC) - final override fun takeAffectedNode(op: OpDescriptor): Node? = - queue.correctPrev(op) // queue head is never removed, so null result can only mean RETRY_ATOMIC - - private val _affectedNode = atomic(null) - final override val affectedNode: Node? get() = _affectedNode.value - final override val originalNext: Node get() = queue - - override fun retry(affected: Node, next: Any): Boolean = next !== queue - - override fun finishPrepare(prepareOp: PrepareOp) { - // Note: onPrepare must use CAS to make sure the stale invocation is not - // going to overwrite the previous decision on successful preparation. - // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes - _affectedNode.compareAndSet(null, prepareOp.affected) - } - - override fun updatedNext(affected: Node, next: Node): Any { - // it is invoked only on successfully completion of operation, but this invocation can be stale, - // so we must use CAS to set both prev & next pointers - node._prev.compareAndSet(node, affected) - node._next.compareAndSet(node, queue) - return node - } - - override fun finishOnSuccess(affected: Node, next: Node) { - node.finishAdd(queue) - } - } - - public open class RemoveFirstDesc( - @JvmField val queue: Node - ) : AbstractAtomicDesc() { - private val _affectedNode = atomic(null) - private val _originalNext = atomic(null) - - @Suppress("UNCHECKED_CAST") - public val result: T get() = affectedNode!! as T - - final override fun takeAffectedNode(op: OpDescriptor): Node? { - queue._next.loop { next -> - if (next is OpDescriptor) { - if (op.isEarlierThan(next)) - return null // RETRY_ATOMIC - next.perform(queue) - } else { - return next as Node - } - } - } - - final override val affectedNode: Node? get() = _affectedNode.value - final override val originalNext: Node? get() = _originalNext.value - - // check node predicates here, must signal failure if affect is not of type T - protected override fun failure(affected: Node): Any? = - if (affected === queue) LIST_EMPTY else null - - final override fun retry(affected: Node, next: Any): Boolean { - if (next !is Removed) return false - next.ref.helpRemovePrev() // must help delete to ensure lock-freedom - return true - } - - override fun finishPrepare(prepareOp: PrepareOp) { - // Note: finishPrepare must use CAS to make sure the stale invocation is not - // going to overwrite the previous decision on successful preparation. - // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes - _affectedNode.compareAndSet(null, prepareOp.affected) - _originalNext.compareAndSet(null, prepareOp.next) - } - - final override fun updatedNext(affected: Node, next: Node): Any = next.removed() - - final override fun finishOnSuccess(affected: Node, next: Node) { - // Complete removal operation here. It bails out if next node is also removed. It becomes - // responsibility of the next's removes to call correctPrev which would help fix all the links. - next.correctPrev(null) - } - } - // This is Harris's RDCSS (Restricted Double-Compare Single Swap) operation // It inserts "op" descriptor of when "op" status is still undecided (rolls back otherwise) - public class PrepareOp( - @JvmField val affected: Node, - @JvmField val next: Node, - @JvmField val desc: AbstractAtomicDesc - ) : OpDescriptor() { - override val atomicOp: AtomicOp<*> get() = desc.atomicOp - - // Returns REMOVE_PREPARED or null (it makes decision on any failure) - override fun perform(affected: Any?): Any? { - assert { affected === this.affected } - affected as Node // type assertion - val decision = desc.onPrepare(this) - if (decision === REMOVE_PREPARED) { - // remove element on failure -- do not mark as decided, will try another one - val next = this.next - val removed = next.removed() - if (affected._next.compareAndSet(this, removed)) { - // The element was actually removed - desc.onRemoved(affected) - // Complete removal operation here. It bails out if next node is also removed and it becomes - // responsibility of the next's removes to call correctPrev which would help fix all the links. - next.correctPrev(null) - } - return REMOVE_PREPARED - } - // We need to ensure progress even if it operation result consensus was already decided - val consensus = if (decision != null) { - // some other logic failure, including RETRY_ATOMIC -- reach consensus on decision fail reason ASAP - atomicOp.decide(decision) - } else { - atomicOp.consensus // consult with current decision status like in Harris DCSS - } - val update: Any = when { - consensus === NO_DECISION -> atomicOp // desc.onPrepare returned null -> start doing atomic op - consensus == null -> desc.updatedNext(affected, next) // move forward if consensus on success - else -> next // roll back if consensus if failure - } - affected._next.compareAndSet(this, update) - return null - } - - public fun finishPrepare(): Unit = desc.finishPrepare(this) - - override fun toString(): String = "PrepareOp(op=$atomicOp)" - } - - public abstract class AbstractAtomicDesc : AtomicDesc() { - protected abstract val affectedNode: Node? - protected abstract val originalNext: Node? - protected open fun takeAffectedNode(op: OpDescriptor): Node? = affectedNode!! // null for RETRY_ATOMIC - protected open fun failure(affected: Node): Any? = null // next: Node | Removed - protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed - protected abstract fun finishOnSuccess(affected: Node, next: Node) - - public abstract fun updatedNext(affected: Node, next: Node): Any - - public abstract fun finishPrepare(prepareOp: PrepareOp) - - // non-null on failure - public open fun onPrepare(prepareOp: PrepareOp): Any? { - finishPrepare(prepareOp) - return null - } - public open fun onRemoved(affected: Node) {} // called once when node was prepared & later removed - - @Suppress("UNCHECKED_CAST") - final override fun prepare(op: AtomicOp<*>): Any? { - while (true) { // lock free loop on next - val affected = takeAffectedNode(op) ?: return RETRY_ATOMIC - // read its original next pointer first - val next = affected._next.value - // then see if already reached consensus on overall operation - if (next === op) return null // already in process of operation -- all is good - if (op.isDecided) return null // already decided this operation -- go to next desc - if (next is OpDescriptor) { - // some other operation is in process - // if operation in progress (preparing or prepared) has higher sequence number -- abort our preparations - if (op.isEarlierThan(next)) - return RETRY_ATOMIC - next.perform(affected) - continue // and retry - } - // next: Node | Removed - val failure = failure(affected) - if (failure != null) return failure // signal failure - if (retry(affected, next)) continue // retry operation - val prepareOp = PrepareOp(affected, next as Node, this) - if (affected._next.compareAndSet(next, prepareOp)) { - // prepared -- complete preparations - try { - val prepFail = prepareOp.perform(affected) - if (prepFail === REMOVE_PREPARED) continue // retry - assert { prepFail == null } - return null - } catch (e: Throwable) { - // Crashed during preparation (for example IllegalStateExpception) -- undo & rethrow - affected._next.compareAndSet(prepareOp, next) - throw e - } - } - } - } - - final override fun complete(op: AtomicOp<*>, failure: Any?) { - val success = failure == null - val affectedNode = affectedNode ?: run { assert { !success }; return } - val originalNext = originalNext ?: run { assert { !success }; return } - val update = if (success) updatedNext(affectedNode, originalNext) else originalNext - if (affectedNode._next.compareAndSet(op, update)) { - if (success) finishOnSuccess(affectedNode, originalNext) - } - } - } // ------ other helpers ------ @@ -562,9 +266,6 @@ public actual open class LockFreeLinkedListNode { * * When this node is removed. In this case there is no need to waste time on corrections, because * remover of this node will ultimately call [correctPrev] on the next node and that will fix all * the links from this node, too. - * * When [op] descriptor is not `null` and operation descriptor that is [OpDescriptor.isEarlierThan] - * that current [op] is found while traversing the list. This `null` result will be translated - * by callers to [RETRY_ATOMIC]. */ private tailrec fun correctPrev(op: OpDescriptor?): Node? { val oldPrev = _prev.value @@ -587,8 +288,6 @@ public actual open class LockFreeLinkedListNode { this.isRemoved -> return null // nothing to do, this node was removed, bail out asap to save time prevNext === op -> return prev // part of the same op -- don't recurse, didn't correct prev prevNext is OpDescriptor -> { // help & retry - if (op != null && op.isEarlierThan(prevNext)) - return null // RETRY_ATOMIC prevNext.perform(prev) return correctPrev(op) // retry from scratch } diff --git a/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt deleted file mode 100644 index 1e08c35a8e..0000000000 --- a/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import kotlin.test.* - -class LockFreeLinkedListTest { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - - @Test - fun testSimpleAddLast() { - val list = LockFreeLinkedListHead() - assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this) } - assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this) } - assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this) } - assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this) } - assertContents(list, 1, 2, 3, 4) - assertTrue(n1.remove()) - assertContents(list, 2, 3, 4) - assertTrue(n3.remove()) - assertContents(list, 2, 4) - assertTrue(n4.remove()) - assertContents(list, 2) - assertTrue(n2.remove()) - assertFalse(n2.remove()) - assertContents(list) - } - - @Test - fun testCondOps() { - val list = LockFreeLinkedListHead() - assertContents(list) - assertTrue(list.addLastIf(IntNode(1)) { true }) - assertContents(list, 1) - assertFalse(list.addLastIf(IntNode(2)) { false }) - assertContents(list, 1) - assertTrue(list.addLastIf(IntNode(3)) { true }) - assertContents(list, 1, 3) - assertFalse(list.addLastIf(IntNode(4)) { false }) - assertContents(list, 1, 3) - } - - @Suppress("UNUSED_VARIABLE") - @Test - fun testAtomicOpsSingle() { - val list = LockFreeLinkedListHead() - assertContents(list) - val n1 = IntNode(1).also { single(list.describeAddLast(it)) } - assertContents(list, 1) - val n2 = IntNode(2).also { single(list.describeAddLast(it)) } - assertContents(list, 1, 2) - val n3 = IntNode(3).also { single(list.describeAddLast(it)) } - assertContents(list, 1, 2, 3) - val n4 = IntNode(4).also { single(list.describeAddLast(it)) } - assertContents(list, 1, 2, 3, 4) - } - - private fun single(part: AtomicDesc) { - val operation = object : AtomicOp() { - init { - part.atomicOp = this - } - override fun prepare(affected: Any?): Any? = part.prepare(this) - override fun complete(affected: Any?, failure: Any?) = part.complete(this, failure) - } - assertTrue(operation.perform(null) == null) - } - - private fun assertContents(list: LockFreeLinkedListHead, vararg expected: Int) { - list.validate() - val n = expected.size - val actual = IntArray(n) - var index = 0 - list.forEach { actual[index++] = it.i } - assertEquals(n, index) - for (i in 0 until n) assertEquals(expected[i], actual[i], "item $i") - assertEquals(expected.isEmpty(), list.isEmpty) - } -} diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt index 7286496b4b..de5d491121 100644 --- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt @@ -96,75 +96,6 @@ public open class LinkedListNode : DisposableHandle { check(next.removeImpl()) { "Should remove" } return next } - - public inline fun removeFirstIfIsInstanceOfOrPeekIf(predicate: (T) -> Boolean): T? { - val next = _next - if (next === this) return null - if (next !is T) return null - if (predicate(next)) return next - check(next.removeImpl()) { "Should remove" } - return next - } -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual open class AddLastDesc actual constructor( - actual val queue: Node, - actual val node: T -) : AbstractAtomicDesc() { - override val affectedNode: Node get() = queue._prev - actual override fun finishPrepare(prepareOp: PrepareOp) {} - override fun onComplete() = queue.addLast(node) - actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual open class RemoveFirstDesc actual constructor( - actual val queue: LockFreeLinkedListNode -) : AbstractAtomicDesc() { - @Suppress("UNCHECKED_CAST") - actual val result: T get() = affectedNode as T - override val affectedNode: Node = queue.nextNode - actual override fun finishPrepare(prepareOp: PrepareOp) {} - override fun onComplete() { queue.removeFirstOrNull() } - actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual abstract class AbstractAtomicDesc : AtomicDesc() { - protected abstract val affectedNode: Node - actual abstract fun finishPrepare(prepareOp: PrepareOp) - protected abstract fun onComplete() - - actual open fun onPrepare(prepareOp: PrepareOp): Any? { - finishPrepare(prepareOp) - return null - } - - actual open fun onRemoved(affected: Node) {} - - actual final override fun prepare(op: AtomicOp<*>): Any? { - val affected = affectedNode - val failure = failure(affected) - if (failure != null) return failure - @Suppress("UNCHECKED_CAST") - return onPrepare(PrepareOp(affected, this, op)) - } - - actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() - protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default - protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds - protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) -} - -/** @suppress **This is unstable API and it is subject to change.** */ -public actual class PrepareOp( - actual val affected: LockFreeLinkedListNode, - actual val desc: AbstractAtomicDesc, - actual override val atomicOp: AtomicOp<*> -): OpDescriptor() { - override fun perform(affected: Any?): Any? = null - actual fun finishPrepare() {} } /** @suppress **This is unstable API and it is subject to change.** */ diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAddRemoveStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAddRemoveStressTest.kt deleted file mode 100644 index 3229e664c1..0000000000 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListAddRemoveStressTest.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import kotlinx.atomicfu.* -import kotlinx.coroutines.* -import java.util.concurrent.* -import kotlin.concurrent.* -import kotlin.test.* - -class LockFreeLinkedListAddRemoveStressTest : TestBase() { - private class Node : LockFreeLinkedListNode() - - private val nRepeat = 100_000 * stressTestMultiplier - private val list = LockFreeLinkedListHead() - private val barrier = CyclicBarrier(3) - private val done = atomic(false) - private val removed = atomic(0) - - @Test - fun testStressAddRemove() { - val threads = ArrayList() - threads += testThread("adder") { - val node = Node() - list.addLast(node) - if (node.remove()) removed.incrementAndGet() - } - threads += testThread("remover") { - val node = list.removeFirstOrNull() - if (node != null) removed.incrementAndGet() - } - try { - for (i in 1..nRepeat) { - barrier.await() - barrier.await() - assertEquals(i, removed.value) - list.validate() - } - } finally { - done.value = true - barrier.await() - threads.forEach { it.join() } - } - } - - private fun testThread(name: String, op: () -> Unit) = thread(name = name) { - while (true) { - barrier.await() - if (done.value) break - op() - barrier.await() - } - } -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt deleted file mode 100644 index 2ac51b9b1d..0000000000 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import kotlinx.coroutines.* -import org.junit.Test -import java.util.* -import java.util.concurrent.atomic.* -import kotlin.concurrent.* -import kotlin.test.* - -/** - * This stress test has 6 threads adding randomly first to the list and them immediately undoing - * this addition by remove, and 4 threads removing first node. The resulting list that is being - * stressed is very short. - */ -class LockFreeLinkedListShortStressTest : TestBase() { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - val list = LockFreeLinkedListHead() - - private val TEST_DURATION = 5000L * stressTestMultiplier - - val threads = mutableListOf() - private val nAdderThreads = 6 - private val nRemoverThreads = 4 - private val completedAdder = AtomicInteger() - private val completedRemover = AtomicInteger() - - private val undone = AtomicInteger() - private val missed = AtomicInteger() - private val removed = AtomicInteger() - - @Test - fun testStress() { - println("--- LockFreeLinkedListShortStressTest") - val deadline = System.currentTimeMillis() + TEST_DURATION - repeat(nAdderThreads) { threadId -> - threads += thread(start = false, name = "adder-$threadId") { - val rnd = Random() - while (System.currentTimeMillis() < deadline) { - var node: IntNode? = IntNode(threadId) - when (rnd.nextInt(3)) { - 0 -> list.addLast(node!!) - 1 -> assertTrue(list.addLastIf(node!!, { true })) // just to test conditional add - 2 -> { // just to test failed conditional add - assertFalse(list.addLastIf(node!!, { false })) - node = null - } - } - if (node != null) { - if (node.remove()) { - undone.incrementAndGet() - } else { - // randomly help other removal's completion - if (rnd.nextBoolean()) node.helpRemove() - missed.incrementAndGet() - } - } - } - completedAdder.incrementAndGet() - } - } - repeat(nRemoverThreads) { threadId -> - threads += thread(start = false, name = "remover-$threadId") { - while (System.currentTimeMillis() < deadline) { - val node = list.removeFirstOrNull() - if (node != null) removed.incrementAndGet() - - } - completedRemover.incrementAndGet() - } - } - threads.forEach { it.start() } - threads.forEach { it.join() } - println("Completed successfully ${completedAdder.get()} adder threads") - println("Completed successfully ${completedRemover.get()} remover threads") - println(" Adders undone ${undone.get()} node additions") - println(" Adders missed ${missed.get()} nodes") - println("Remover removed ${removed.get()} nodes") - assertEquals(nAdderThreads, completedAdder.get()) - assertEquals(nRemoverThreads, completedRemover.get()) - assertEquals(missed.get(), removed.get()) - assertTrue(undone.get() > 0) - assertTrue(missed.get() > 0) - list.validate() - } -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt deleted file mode 100644 index 4f1bb6ad02..0000000000 --- a/kotlinx-coroutines-core/jvm/test/lincheck/LockFreeListLincheckTest.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ -@file:Suppress("unused") - -package kotlinx.coroutines.lincheck - -import kotlinx.coroutines.* -import kotlinx.coroutines.internal.* -import org.jetbrains.kotlinx.lincheck.annotations.* -import org.jetbrains.kotlinx.lincheck.annotations.Operation -import org.jetbrains.kotlinx.lincheck.paramgen.* -import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* - -@Param(name = "value", gen = IntGen::class, conf = "1:5") -class LockFreeListLincheckTest : AbstractLincheckTest() { - class Node(val value: Int): LockFreeLinkedListNode() - - private val q: LockFreeLinkedListHead = LockFreeLinkedListHead() - - @Operation - fun addLast(@Param(name = "value") value: Int) { - q.addLast(Node(value)) - } - - @Operation - fun addLastIfNotSame(@Param(name = "value") value: Int) { - q.addLastIfPrev(Node(value)) { !it.isSame(value) } - } - - @Operation - fun removeFirst(): Int? { - val node = q.removeFirstOrNull() ?: return null - return (node as Node).value - } - - @Operation - fun removeFirstOrPeekIfNotSame(@Param(name = "value") value: Int): Int? { - val node = q.removeFirstIfIsInstanceOfOrPeekIf { !it.isSame(value) } ?: return null - return node.value - } - - private fun Any.isSame(value: Int) = this is Node && this.value == value - - override fun extractState(): Any { - val elements = ArrayList() - q.forEach { elements.add(it.value) } - return elements - } - - override fun ModelCheckingOptions.customize(isStressTest: Boolean) = - checkObstructionFreedom() -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/test/internal/LinkedListTest.kt b/kotlinx-coroutines-core/native/test/internal/LinkedListTest.kt deleted file mode 100644 index 44ddf471d7..0000000000 --- a/kotlinx-coroutines-core/native/test/internal/LinkedListTest.kt +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class LinkedListTest { - data class IntNode(val i: Int) : LockFreeLinkedListNode() - - @Test - fun testSimpleAddLastRemove() { - val list = LockFreeLinkedListHead() - assertContents(list) - val n1 = IntNode(1).apply { list.addLast(this) } - assertContents(list, 1) - val n2 = IntNode(2).apply { list.addLast(this) } - assertContents(list, 1, 2) - val n3 = IntNode(3).apply { list.addLast(this) } - assertContents(list, 1, 2, 3) - val n4 = IntNode(4).apply { list.addLast(this) } - assertContents(list, 1, 2, 3, 4) - assertTrue(n1.remove()) - assertContents(list, 2, 3, 4) - assertTrue(n3.remove()) - assertContents(list, 2, 4) - assertTrue(n4.remove()) - assertContents(list, 2) - assertTrue(n2.remove()) - assertFalse(n2.remove()) - assertContents(list) - } - - private fun assertContents(list: LockFreeLinkedListHead, vararg expected: Int) { - val n = expected.size - val actual = IntArray(n) - var index = 0 - list.forEach { actual[index++] = it.i } - assertEquals(n, index) - for (i in 0 until n) assertEquals(expected[i], actual[i], "item i") - assertEquals(expected.isEmpty(), list.isEmpty) - } -}