diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index a277169065..c5c5d3b5c6 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -1045,7 +1045,6 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V public fun isSelected ()Z public fun onTimeout (JLkotlin/jvm/functions/Function1;)V - public fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object; public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object; public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V public fun resumeWith (Ljava/lang/Object;)V @@ -1068,7 +1067,6 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance { public abstract fun disposeOnSelect (Lkotlinx/coroutines/DisposableHandle;)V public abstract fun getCompletion ()Lkotlin/coroutines/Continuation; public abstract fun isSelected ()Z - public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object; public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object; public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V public abstract fun trySelect (Ljava/lang/Object;)Z diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index bed497909d..7b8f96b6e4 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -96,10 +96,10 @@ internal abstract class AbstractSendChannel : SendChannel { * @suppress **This is unstable API and it is subject to change.** */ protected fun sendBuffered(element: E): ReceiveOrClosed<*>? { - queue.addLastIfPrev(SendBuffered(element), { prev -> + queue.addLastIfPrev(SendBuffered(element)) { prev -> if (prev is ReceiveOrClosed<*>) return@sendBuffered prev true - }) + } return null } @@ -112,9 +112,10 @@ internal abstract class AbstractSendChannel : SendChannel { queue: LockFreeLinkedListHead, element: E ) : AddLastDesc>(queue, SendBuffered(element)) { - override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? { - if (affected is ReceiveOrClosed<*>) return OFFER_FAILED - return null + override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { + is Closed<*> -> affected + is ReceiveOrClosed<*> -> OFFER_FAILED + else -> null } } @@ -168,18 +169,23 @@ internal abstract class AbstractSendChannel : SendChannel { } private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont -> - val send = SendElement(element, cont) loop@ while (true) { - val enqueueResult = enqueueSend(send) - when (enqueueResult) { - null -> { // enqueued successfully - cont.removeOnCancellation(send) - return@sc - } - is Closed<*> -> { - helpClose(enqueueResult) - cont.resumeWithException(enqueueResult.sendException) - return@sc + if (full) { + val send = SendElement(element, cont) + val enqueueResult = enqueueSend(send) + when { + enqueueResult == null -> { // enqueued successfully + cont.removeOnCancellation(send) + return@sc + } + enqueueResult is Closed<*> -> { + helpClose(enqueueResult) + cont.resumeWithException(enqueueResult.sendException) + return@sc + } + enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead + enqueueResult is Receive<*> -> {} // try to offer instead + else -> error("enqueueSend returned $enqueueResult") } } // hm... receiver is waiting or buffer is not full. try to offer @@ -206,12 +212,12 @@ internal abstract class AbstractSendChannel : SendChannel { * * ENQUEUE_FAILED -- buffer is not full (should not enqueue) * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue) */ - private fun enqueueSend(send: SendElement): Any? { + private fun enqueueSend(send: Send): Any? { if (isBufferAlwaysFull) { - queue.addLastIfPrev(send, { prev -> + queue.addLastIfPrev(send) { prev -> if (prev is ReceiveOrClosed<*>) return@enqueueSend prev true - }) + } } else { if (!queue.addLastIfPrevAndIf(send, { prev -> if (prev is ReceiveOrClosed<*>) return@enqueueSend prev @@ -333,10 +339,10 @@ internal abstract class AbstractSendChannel : SendChannel { ) : RemoveFirstDesc>(queue) { @JvmField var resumeToken: Any? = null - override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? { - if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED - if (affected is Closed<*>) return affected - return null + override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { + is Closed<*> -> affected + !is ReceiveOrClosed<*> -> OFFER_FAILED + else -> null } override fun validatePrepared(node: ReceiveOrClosed): Boolean { @@ -346,30 +352,6 @@ internal abstract class AbstractSendChannel : SendChannel { } } - private inner class TryEnqueueSendDesc( - element: E, - select: SelectInstance, - block: suspend (SendChannel) -> R - ) : AddLastDesc>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) { - override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? { - if (affected is ReceiveOrClosed<*>) { - return affected as? Closed<*> ?: ENQUEUE_FAILED - } - return null - } - - override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? { - if (!isBufferFull) return ENQUEUE_FAILED - return super.onPrepare(affected, next) - } - - override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) { - super.finishOnSuccess(affected, next) - // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there) - node.disposeOnSelect() - } - } - final override val onSend: SelectClause2> get() = object : SelectClause2> { override fun registerSelectClause2(select: SelectInstance, param: E, block: suspend (SendChannel) -> R) { @@ -381,26 +363,36 @@ internal abstract class AbstractSendChannel : SendChannel { while (true) { if (select.isSelected) return if (full) { - val enqueueOp = TryEnqueueSendDesc(element, select, block) - val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return + val node = SendSelect(element, this, select, block) + val enqueueResult = enqueueSend(node) when { - enqueueResult === ALREADY_SELECTED -> return - enqueueResult === ENQUEUE_FAILED -> {} // retry - enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException) - else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult") - } - } else { - val offerResult = offerSelectInternal(element, select) - when { - offerResult === ALREADY_SELECTED -> return - offerResult === OFFER_FAILED -> {} // retry - offerResult === OFFER_SUCCESS -> { - block.startCoroutineUnintercepted(receiver = this, completion = select.completion) + enqueueResult == null -> { // enqueued successfully + select.disposeOnSelect(node) return } - offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException) - else -> error("offerSelectInternal returned $offerResult") + enqueueResult is Closed<*> -> { + helpClose(enqueueResult) + throw recoverStackTrace(enqueueResult.sendException) + } + enqueueResult === ENQUEUE_FAILED -> {} // try to offer + enqueueResult is Receive<*> -> {} // try to offer + else -> error("enqueueSend returned $enqueueResult ") + } + } + // hm... receiver is waiting or buffer is not full. try to offer + val offerResult = offerSelectInternal(element, select) + when { + offerResult === ALREADY_SELECTED -> return + offerResult === OFFER_FAILED -> {} // retry + offerResult === OFFER_SUCCESS -> { + block.startCoroutineUnintercepted(receiver = this, completion = select.completion) + return + } + offerResult is Closed<*> -> { + helpClose(offerResult) + throw recoverStackTrace(offerResult.sendException) } + else -> error("offerSelectInternal returned $offerResult") } } } @@ -443,7 +435,7 @@ internal abstract class AbstractSendChannel : SendChannel { @JvmField val channel: SendChannel, @JvmField val select: SelectInstance, @JvmField val block: suspend (SendChannel) -> R - ) : LockFreeLinkedListNode(), Send, DisposableHandle { + ) : Send(), DisposableHandle { override fun tryResumeSend(idempotent: Any?): Any? = if (select.trySelect(idempotent)) SELECT_STARTED else null @@ -452,11 +444,7 @@ internal abstract class AbstractSendChannel : SendChannel { block.startCoroutine(receiver = channel, completion = select.completion) } - fun disposeOnSelect() { - select.disposeOnSelect(this) - } - - override fun dispose() { + override fun dispose() { // invoked on select completion remove() } @@ -470,7 +458,7 @@ internal abstract class AbstractSendChannel : SendChannel { internal class SendBuffered( @JvmField val element: E - ) : LockFreeLinkedListNode(), Send { + ) : Send() { override val pollResult: Any? get() = element override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } } @@ -556,8 +544,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel receiveSuspend(onClose: Int): R = suspendAtomicCancellableCoroutine sc@ { cont -> - val receive = ReceiveElement(cont as CancellableContinuation, onClose) + private suspend fun receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutine sc@ { cont -> + val receive = ReceiveElement(cont as CancellableContinuation, receiveMode) while (true) { if (enqueueReceive(receive)) { removeReceiveOnCancel(cont, receive) @@ -578,7 +566,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel): Boolean { val result = if (isBufferAlwaysEmpty) - queue.addLastIfPrev(receive, { it !is Send }) else + queue.addLastIfPrev(receive) { it !is Send } else queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty }) if (result) onReceiveEnqueued() return result @@ -659,10 +647,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) return affected - if (affected !is Send) return POLL_FAILED - return null + override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) { + is Closed<*> -> affected + !is Send -> POLL_FAILED + else -> null } @Suppress("UNCHECKED_CAST") @@ -674,30 +662,6 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( - select: SelectInstance, - block: suspend (Any?) -> R, - receiveMode: Int - ) : AddLastDesc>(queue, ReceiveSelect(select, block, receiveMode)) { - override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? { - if (affected is Send) return ENQUEUE_FAILED - return null - } - - override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? { - if (!isBufferEmpty) return ENQUEUE_FAILED - return super.onPrepare(affected, next) - } - - override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) { - super.finishOnSuccess(affected, next) - // notify the there is one more receiver - onReceiveEnqueued() - // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there) - node.removeOnSelectCompletion() - } - } - final override val onReceive: SelectClause1 get() = object : SelectClause1 { override fun registerSelectClause1(select: SelectInstance, block: suspend (E) -> R) { @@ -710,7 +674,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, RECEIVE_THROWS_ON_CLOSE)) return } else { val pollResult = pollSelectInternal(select) when { @@ -738,7 +702,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, RECEIVE_NULL_ON_CLOSE)) return } else { val pollResult = pollSelectInternal(select) when { @@ -775,7 +739,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, RECEIVE_RESULT)) return } else { val pollResult = pollSelectInternal(select) when { @@ -794,18 +758,15 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel registerEnqueueDesc( - select: SelectInstance, block: suspend (E) -> R, + private fun enqueueReceiveSelect( + select: SelectInstance, + block: suspend (Any?) -> R, receiveMode: Int ): Boolean { - @Suppress("UNCHECKED_CAST") - val enqueueOp = TryEnqueueReceiveDesc(select, block as suspend (Any?) -> R, receiveMode) - val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true - return when { - enqueueResult === ALREADY_SELECTED -> true - enqueueResult === ENQUEUE_FAILED -> false // retry - else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") - } + val node = ReceiveSelect(this, select, block, receiveMode) + val result = enqueueReceive(node) + if (result) select.disposeOnSelect(node) + return result } // ------ protected ------ @@ -917,7 +878,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel cont.resumeWithException(closed.receiveException) } } - override fun toString(): String = "ReceiveElement[$cont,receiveMode=$receiveMode]" + override fun toString(): String = "ReceiveElement[receiveMode=$receiveMode]" } private class ReceiveHasNext( @@ -957,10 +918,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( + private class ReceiveSelect( + @JvmField val channel: AbstractChannel, @JvmField val select: SelectInstance, @JvmField val block: suspend (Any?) -> R, @JvmField val receiveMode: Int @@ -987,13 +949,9 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel Unit /** * Represents sending waiter in the queue. */ -internal interface Send { - val pollResult: Any? // E | Closed - fun tryResumeSend(idempotent: Any?): Any? - fun completeResumeSend(token: Any) - fun resumeSendClosed(closed: Closed<*>) +internal abstract class Send : LockFreeLinkedListNode() { + abstract val pollResult: Any? // E | Closed + abstract fun tryResumeSend(idempotent: Any?): Any? + abstract fun completeResumeSend(token: Any) + abstract fun resumeSendClosed(closed: Closed<*>) } /** @@ -1074,11 +1032,11 @@ internal interface ReceiveOrClosed { internal class SendElement( override val pollResult: Any?, @JvmField val cont: CancellableContinuation -) : LockFreeLinkedListNode(), Send { +) : Send() { override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent) override fun completeResumeSend(token: Any) = cont.completeResume(token) override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException) - override fun toString(): String = "SendElement($pollResult)[$cont]" + override fun toString(): String = "SendElement($pollResult)" } /** @@ -1086,7 +1044,7 @@ internal class SendElement( */ internal class Closed( @JvmField val closeCause: Throwable? -) : LockFreeLinkedListNode(), Send, ReceiveOrClosed { +) : Send(), ReceiveOrClosed { val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE) val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE) diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index c053307099..fc1c72f067 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -66,7 +66,7 @@ public expect open class RemoveFirstDesc(queue: LockFreeLinkedListNode): Abst public expect abstract class AbstractAtomicDesc : AtomicDesc { final override fun prepare(op: AtomicOp<*>): Any? final override fun complete(op: AtomicOp<*>, failure: Any?) - protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? + protected open fun failure(affected: LockFreeLinkedListNode): Any? protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 9555f2b9b9..4626fe1d38 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -110,11 +110,6 @@ public interface SelectInstance { */ public fun performAtomicTrySelect(desc: AtomicDesc): Any? - /** - * Performs action atomically when [isSelected] is `false`. - */ - public fun performAtomicIfNotSelected(desc: AtomicDesc): Any? - /** * Returns completion continuation of this select instance. * This select instance must be _selected_ first. @@ -129,6 +124,7 @@ public interface SelectInstance { /** * Disposes the specified handle when this instance is selected. + * Note, that [DisposableHandle.dispose] could be called multiple times. */ public fun disposeOnSelect(handle: DisposableHandle) } @@ -329,16 +325,14 @@ internal class SelectBuilderImpl( override fun disposeOnSelect(handle: DisposableHandle) { val node = DisposeNode(handle) - while (true) { // lock-free loop on state - val state = this.state - if (state === this) { - if (addLastIf(node, { this.state === this })) - return - } else { // already selected - handle.dispose() - return - } + // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times + if (!isSelected) { + addLast(node) // add handle to list + // double-check node after adding + if (!isSelected) return // all ok - still not selected } + // already selected + handle.dispose() } private fun doAfterSelect() { @@ -368,12 +362,11 @@ internal class SelectBuilderImpl( } } - override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null) - override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null) + override fun performAtomicTrySelect(desc: AtomicDesc): Any? = + AtomicSelectOp(desc).perform(null) private inner class AtomicSelectOp( - @JvmField val desc: AtomicDesc, - @JvmField val select: Boolean + @JvmField val desc: AtomicDesc ) : AtomicOp() { override fun prepare(affected: Any?): Any? { // only originator of operation makes preparation move of installing descriptor into this selector's state @@ -405,7 +398,7 @@ internal class SelectBuilderImpl( } private fun completeSelect(failure: Any?) { - val selectSuccess = select && failure == null + val selectSuccess = failure == null val update = if (selectSuccess) null else this@SelectBuilderImpl if (_state.compareAndSet(this@AtomicSelectOp, update)) { if (selectSuccess) diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index fa198e1371..3c72915379 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -246,16 +246,11 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { } is LockedQueue -> { check(state.owner !== owner) { "Already locked by $owner" } - val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block) - val failure = select.performAtomicIfNotSelected(enqueueOp) - when { - failure == null -> { // successfully enqueued - select.disposeOnSelect(enqueueOp.node) - return - } - failure === ALREADY_SELECTED -> return // already selected -- bail out - failure === ENQUEUE_FAIL -> {} // retry - else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure") + val node = LockSelect(owner, this, select, block) + if (state.addLastIf(node) { _state.value === state }) { + // successfully enqueued + select.disposeOnSelect(node) + return } } is OpDescriptor -> state.perform(this) // help @@ -291,19 +286,6 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { } } - private class TryEnqueueLockDesc( - @JvmField val mutex: MutexImpl, - owner: Any?, - queue: LockedQueue, - select: SelectInstance, - block: suspend (Mutex) -> R - ) : AddLastDesc>(queue, LockSelect(owner, mutex, select, block)) { - override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? { - if (mutex._state.value !== queue) return ENQUEUE_FAIL - return super.onPrepare(affected, next) - } - } - public override fun holdsLock(owner: Any) = _state.value.let { state -> when (state) { diff --git a/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt new file mode 100644 index 0000000000..a066f6b3a9 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.selects + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class SelectLinkedListChannelTest : TestBase() { + @Test + fun testSelectSendWhenClosed() = runTest { + expect(1) + val c = Channel(Channel.UNLIMITED) + c.send(1) // enqueue buffered element + c.close() // then close + assertFailsWith { + // select sender should fail + expect(2) + select { + c.onSend(2) { + expectUnreached() + } + } + } + finish(3) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt index ed8b8d3691..6072cc2cbb 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -403,6 +403,29 @@ class SelectRendezvousChannelTest : TestBase() { finish(10) } + @Test + fun testSelectSendWhenClosed() = runTest { + expect(1) + val c = Channel(Channel.RENDEZVOUS) + val sender = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + c.send(1) // enqueue sender + expectUnreached() + } + c.close() // then close + assertFailsWith { + // select sender should fail + expect(3) + select { + c.onSend(2) { + expectUnreached() + } + } + } + sender.cancel() + finish(4) + } + // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt index 3f179d1d81..6050901058 100644 --- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("unused") @@ -129,13 +129,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() { actual final override fun prepare(op: AtomicOp<*>): Any? { val affected = affectedNode val next = affected._next - val failure = failure(affected, next) + val failure = failure(affected) if (failure != null) return failure return onPrepare(affected, next) } actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() - protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default + protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt index 7d28de2574..d3d168a427 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt @@ -254,26 +254,6 @@ public actual open class LockFreeLinkedListNode { finishRemove(removed.ref) } - public open fun describeRemove() : AtomicDesc? { - if (isRemoved) return null // fast path if was already removed - return object : AbstractAtomicDesc() { - private val _originalNext = atomic(null) - override val affectedNode: Node? get() = this@LockFreeLinkedListNode - override val originalNext get() = _originalNext.value - override fun failure(affected: Node, next: Any): Any? = - if (next is Removed) ALREADY_REMOVED else null - override fun onPrepare(affected: Node, next: Node): Any? { - // Note: onPrepare must use CAS to make sure the stale invocation is not - // going to overwrite the previous decision on successful preparation. - // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes - _originalNext.compareAndSet(null, next) - return null // always success - } - override fun updatedNext(affected: Node, next: Node) = next.removed() - override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next) - } - } - public actual fun removeFirstOrNull(): Node? { while (true) { // try to linearize val first = next as Node @@ -376,7 +356,7 @@ public actual open class LockFreeLinkedListNode { final override val originalNext: Node? get() = _originalNext.value // check node predicates here, must signal failure if affect is not of type T - protected override fun failure(affected: Node, next: Any): Any? = + protected override fun failure(affected: Node): Any? = if (affected === queue) LIST_EMPTY else null // validate the resulting node (return false if it should be deleted) @@ -408,7 +388,7 @@ public actual open class LockFreeLinkedListNode { protected abstract val affectedNode: Node? protected abstract val originalNext: Node? protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!! - protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed + protected open fun failure(affected: Node): Any? = null // next: Node | Removed protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure protected abstract fun updatedNext(affected: Node, next: Node): Any @@ -460,7 +440,7 @@ public actual open class LockFreeLinkedListNode { continue // and retry } // next: Node | Removed - val failure = failure(affected, next) + val failure = failure(affected) if (failure != null) return failure // signal failure if (retry(affected, next)) continue // retry operation val prepareOp = PrepareOp(next as Node, op as AtomicOp, this) @@ -684,8 +664,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { // just a defensive programming -- makes sure that list head sentinel is never removed public actual final override fun remove(): Boolean = throw UnsupportedOperationException() - public final override fun describeRemove(): Nothing = throw UnsupportedOperationException() - internal fun validate() { var prev: Node = this var cur: Node = next as Node diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt index 1400441fb8..9238681e8b 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -47,29 +47,6 @@ class LockFreeLinkedListTest { assertContents(list, 1, 3) } - @Test - fun testRemoveTwoAtomic() { - val list = LockFreeLinkedListHead() - val n1 = IntNode(1).apply { list.addLast(this) } - val n2 = IntNode(2).apply { list.addLast(this) } - assertContents(list, 1, 2) - assertFalse(n1.isRemoved) - assertFalse(n2.isRemoved) - val remove1Desc = n1.describeRemove()!! - val remove2Desc = n2.describeRemove()!! - val operation = object : AtomicOp() { - override fun prepare(affected: Any?): Any? = remove1Desc.prepare(this) ?: remove2Desc.prepare(this) - override fun complete(affected: Any?, failure: Any?) { - remove1Desc.complete(this, failure) - remove2Desc.complete(this, failure) - } - } - assertTrue(operation.perform(null) == null) - assertTrue(n1.isRemoved) - assertTrue(n2.isRemoved) - assertContents(list) - } - @Test fun testAtomicOpsSingle() { val list = LockFreeLinkedListHead() @@ -82,16 +59,6 @@ class LockFreeLinkedListTest { assertContents(list, 1, 2, 3) val n4 = IntNode(4).also { single(list.describeAddLast(it)) } assertContents(list, 1, 2, 3, 4) - single(n3.describeRemove()!!) - assertContents(list, 1, 2, 4) - assertTrue(n3.describeRemove() == null) - single(list.describeRemoveFirst()) - assertContents(list, 2, 4) - assertTrue(n1.describeRemove() == null) - assertTrue(n2.remove()) - assertContents(list, 4) - assertTrue(n4.remove()) - assertContents(list) } private fun single(part: AtomicDesc) { diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt new file mode 100644 index 0000000000..7f924dba09 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/selects/SelectMemoryLeakStressTest.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.selects + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +class SelectMemoryLeakStressTest : TestBase() { + private val nRepeat = 1_000_000 * stressTestMultiplier + + @Test + fun testLeakRegisterSend() = runTest { + expect(1) + val leak = Channel() + val data = Channel(1) + repeat(nRepeat) { value -> + data.send(value) + val bigValue = bigValue() // new instance + select { + leak.onSend("LEAK") { + println("Capture big value into this lambda: $bigValue") + expectUnreached() + } + data.onReceive { received -> + assertEquals(value, received) + expect(value + 2) + } + } + } + finish(nRepeat + 2) + } + + @Test + fun testLeakRegisterReceive() = runTest { + expect(1) + val leak = Channel() + val data = Channel(1) + repeat(nRepeat) { value -> + val bigValue = bigValue() // new instance + select { + leak.onReceive { + println("Capture big value into this lambda: $bigValue") + expectUnreached() + } + data.onSend(value) { + expect(value + 2) + } + } + assertEquals(value, data.receive()) + } + finish(nRepeat + 2) + } + + // capture big value for fast OOM in case of a bug + private fun bigValue(): ByteArray = ByteArray(4096) +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/internal/LinkedList.kt b/kotlinx-coroutines-core/native/src/internal/LinkedList.kt index 7e9657e2c5..07fe1a06c7 100644 --- a/kotlinx-coroutines-core/native/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/native/src/internal/LinkedList.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.internal @@ -127,13 +127,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() { actual final override fun prepare(op: AtomicOp<*>): Any? { val affected = affectedNode val next = affected._next - val failure = failure(affected, next) + val failure = failure(affected) if (failure != null) return failure return onPrepare(affected, next) } actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() - protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default + protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) }