Skip to content

Optimize select expression registration phase #1445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,6 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
public fun isSelected ()Z
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
public fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
Expand All @@ -1068,7 +1067,6 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
public abstract fun disposeOnSelect (Lkotlinx/coroutines/DisposableHandle;)V
public abstract fun getCompletion ()Lkotlin/coroutines/Continuation;
public abstract fun isSelected ()Z
public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public abstract fun trySelect (Ljava/lang/Object;)Z
Expand Down
218 changes: 88 additions & 130 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal
Expand Down Expand Up @@ -66,7 +66,7 @@ public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): Abst
public expect abstract class AbstractAtomicDesc : AtomicDesc {
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
protected open fun failure(affected: LockFreeLinkedListNode): Any?
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
Expand Down
31 changes: 12 additions & 19 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ public interface SelectInstance<in R> {
*/
public fun performAtomicTrySelect(desc: AtomicDesc): Any?

/**
* Performs action atomically when [isSelected] is `false`.
*/
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?

/**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
Expand All @@ -129,6 +124,7 @@ public interface SelectInstance<in R> {

/**
* Disposes the specified handle when this instance is selected.
* Note, that [DisposableHandle.dispose] could be called multiple times.
*/
public fun disposeOnSelect(handle: DisposableHandle)
}
Expand Down Expand Up @@ -329,16 +325,14 @@ internal class SelectBuilderImpl<in R>(

override fun disposeOnSelect(handle: DisposableHandle) {
val node = DisposeNode(handle)
while (true) { // lock-free loop on state
val state = this.state
if (state === this) {
if (addLastIf(node, { this.state === this }))
return
} else { // already selected
handle.dispose()
return
}
// check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
if (!isSelected) {
addLast(node) // add handle to list
// double-check node after adding
if (!isSelected) return // all ok - still not selected
}
// already selected
handle.dispose()
}

private fun doAfterSelect() {
Expand Down Expand Up @@ -368,12 +362,11 @@ internal class SelectBuilderImpl<in R>(
}
}

override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
AtomicSelectOp(desc).perform(null)

private inner class AtomicSelectOp(
@JvmField val desc: AtomicDesc,
@JvmField val select: Boolean
@JvmField val desc: AtomicDesc
) : AtomicOp<Any?>() {
override fun prepare(affected: Any?): Any? {
// only originator of operation makes preparation move of installing descriptor into this selector's state
Expand Down Expand Up @@ -405,7 +398,7 @@ internal class SelectBuilderImpl<in R>(
}

private fun completeSelect(failure: Any?) {
val selectSuccess = select && failure == null
val selectSuccess = failure == null
val update = if (selectSuccess) null else this@SelectBuilderImpl
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
if (selectSuccess)
Expand Down
28 changes: 5 additions & 23 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,11 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
}
is LockedQueue -> {
check(state.owner !== owner) { "Already locked by $owner" }
val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block)
val failure = select.performAtomicIfNotSelected(enqueueOp)
when {
failure == null -> { // successfully enqueued
select.disposeOnSelect(enqueueOp.node)
return
}
failure === ALREADY_SELECTED -> return // already selected -- bail out
failure === ENQUEUE_FAIL -> {} // retry
else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure")
val node = LockSelect(owner, this, select, block)
if (state.addLastIf(node) { _state.value === state }) {
// successfully enqueued
select.disposeOnSelect(node)
return
}
}
is OpDescriptor -> state.perform(this) // help
Expand Down Expand Up @@ -291,19 +286,6 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
}
}

private class TryEnqueueLockDesc<R>(
@JvmField val mutex: MutexImpl,
owner: Any?,
queue: LockedQueue,
select: SelectInstance<R>,
block: suspend (Mutex) -> R
) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
if (mutex._state.value !== queue) return ENQUEUE_FAIL
return super.onPrepare(affected, next)
}
}

public override fun holdsLock(owner: Any) =
_state.value.let { state ->
when (state) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.selects

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

class SelectLinkedListChannelTest : TestBase() {
@Test
fun testSelectSendWhenClosed() = runTest {
expect(1)
val c = Channel<Int>(Channel.UNLIMITED)
c.send(1) // enqueue buffered element
c.close() // then close
assertFailsWith<ClosedSendChannelException> {
// select sender should fail
expect(2)
select {
c.onSend(2) {
expectUnreached()
}
}
}
finish(3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,29 @@ class SelectRendezvousChannelTest : TestBase() {
finish(10)
}

@Test
fun testSelectSendWhenClosed() = runTest {
expect(1)
val c = Channel<Int>(Channel.RENDEZVOUS)
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
expect(2)
c.send(1) // enqueue sender
expectUnreached()
}
c.close() // then close
assertFailsWith<ClosedSendChannelException> {
// select sender should fail
expect(3)
select {
c.onSend(2) {
expectUnreached()
}
}
}
sender.cancel()
finish(4)
}

// only for debugging
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/js/src/internal/LinkedList.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("unused")
Expand Down Expand Up @@ -129,13 +129,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() {
actual final override fun prepare(op: AtomicOp<*>): Any? {
val affected = affectedNode
val next = affected._next
val failure = failure(affected, next)
val failure = failure(affected)
if (failure != null) return failure
return onPrepare(affected, next)
}

actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete()
protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default
protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default
protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds
protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
Expand Down
28 changes: 3 additions & 25 deletions kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
Original file line number Diff line number Diff line change
Expand Up @@ -254,26 +254,6 @@ public actual open class LockFreeLinkedListNode {
finishRemove(removed.ref)
}

public open fun describeRemove() : AtomicDesc? {
if (isRemoved) return null // fast path if was already removed
return object : AbstractAtomicDesc() {
private val _originalNext = atomic<Node?>(null)
override val affectedNode: Node? get() = this@LockFreeLinkedListNode
override val originalNext get() = _originalNext.value
override fun failure(affected: Node, next: Any): Any? =
if (next is Removed) ALREADY_REMOVED else null
override fun onPrepare(affected: Node, next: Node): Any? {
// Note: onPrepare must use CAS to make sure the stale invocation is not
// going to overwrite the previous decision on successful preparation.
// Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
_originalNext.compareAndSet(null, next)
return null // always success
}
override fun updatedNext(affected: Node, next: Node) = next.removed()
override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
}
}

public actual fun removeFirstOrNull(): Node? {
while (true) { // try to linearize
val first = next as Node
Expand Down Expand Up @@ -376,7 +356,7 @@ public actual open class LockFreeLinkedListNode {
final override val originalNext: Node? get() = _originalNext.value

// check node predicates here, must signal failure if affect is not of type T
protected override fun failure(affected: Node, next: Any): Any? =
protected override fun failure(affected: Node): Any? =
if (affected === queue) LIST_EMPTY else null

// validate the resulting node (return false if it should be deleted)
Expand Down Expand Up @@ -408,7 +388,7 @@ public actual open class LockFreeLinkedListNode {
protected abstract val affectedNode: Node?
protected abstract val originalNext: Node?
protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!!
protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
protected open fun failure(affected: Node): Any? = null // next: Node | Removed
protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure
protected abstract fun updatedNext(affected: Node, next: Node): Any
Expand Down Expand Up @@ -460,7 +440,7 @@ public actual open class LockFreeLinkedListNode {
continue // and retry
}
// next: Node | Removed
val failure = failure(affected, next)
val failure = failure(affected)
if (failure != null) return failure // signal failure
if (retry(affected, next)) continue // retry operation
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
Expand Down Expand Up @@ -684,8 +664,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
// just a defensive programming -- makes sure that list head sentinel is never removed
public actual final override fun remove(): Boolean = throw UnsupportedOperationException()

public final override fun describeRemove(): Nothing = throw UnsupportedOperationException()

internal fun validate() {
var prev: Node = this
var cur: Node = next as Node
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal
Expand Down Expand Up @@ -47,29 +47,6 @@ class LockFreeLinkedListTest {
assertContents(list, 1, 3)
}

@Test
fun testRemoveTwoAtomic() {
val list = LockFreeLinkedListHead()
val n1 = IntNode(1).apply { list.addLast(this) }
val n2 = IntNode(2).apply { list.addLast(this) }
assertContents(list, 1, 2)
assertFalse(n1.isRemoved)
assertFalse(n2.isRemoved)
val remove1Desc = n1.describeRemove()!!
val remove2Desc = n2.describeRemove()!!
val operation = object : AtomicOp<Any?>() {
override fun prepare(affected: Any?): Any? = remove1Desc.prepare(this) ?: remove2Desc.prepare(this)
override fun complete(affected: Any?, failure: Any?) {
remove1Desc.complete(this, failure)
remove2Desc.complete(this, failure)
}
}
assertTrue(operation.perform(null) == null)
assertTrue(n1.isRemoved)
assertTrue(n2.isRemoved)
assertContents(list)
}

@Test
fun testAtomicOpsSingle() {
val list = LockFreeLinkedListHead()
Expand All @@ -82,16 +59,6 @@ class LockFreeLinkedListTest {
assertContents(list, 1, 2, 3)
val n4 = IntNode(4).also { single(list.describeAddLast(it)) }
assertContents(list, 1, 2, 3, 4)
single(n3.describeRemove()!!)
assertContents(list, 1, 2, 4)
assertTrue(n3.describeRemove() == null)
single(list.describeRemoveFirst())
assertContents(list, 2, 4)
assertTrue(n1.describeRemove() == null)
assertTrue(n2.remove())
assertContents(list, 4)
assertTrue(n4.remove())
assertContents(list)
}

private fun single(part: AtomicDesc) {
Expand Down
Loading