Skip to content

Commit 3807a74

Browse files
elizarovqwwdfsad
authored andcommitted
Optimize select expression registration phase (#1445)
* Optimize select expression registration phase There is no need for multi-word atomic performAtomicIfNotSelected operation when enqueuing select node for operation. We can simply enqueue (addLast) xxxSelect node (SendSelect, ReceiveSelect, LockSelect). If the coroutine that rendezvous with this node finds out that the select expression was already selected, then it'll try again. * Removed SelectInstance.performAtomicIfNotSelected function * Removed Mutex.TryEnqueueLockDesc class, simplified onLock * Removed AbstractSendChannel.TryEnqueueSendDesc class, simpler onSend * Removed AbstractChannel.TryEnqueueReceiveDesc class, simpler onReceive * Simplified SelectInstance.disposeOnSelect. It does not have to do atomic addLastIf operation. Can do a simple addLast. * Fixed unlimited channel select onSend on closed channel It was hanging not being able to properly see that the channel was already closed at that send attempt should fail.
1 parent 0342a0a commit 3807a74

File tree

12 files changed

+228
-241
lines changed

12 files changed

+228
-241
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

-2
Original file line numberDiff line numberDiff line change
@@ -1046,7 +1046,6 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10461046
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
10471047
public fun isSelected ()Z
10481048
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
1049-
public fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10501049
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10511050
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10521051
public fun resumeWith (Ljava/lang/Object;)V
@@ -1069,7 +1068,6 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10691068
public abstract fun disposeOnSelect (Lkotlinx/coroutines/DisposableHandle;)V
10701069
public abstract fun getCompletion ()Lkotlin/coroutines/Continuation;
10711070
public abstract fun isSelected ()Z
1072-
public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10731071
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10741072
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10751073
public abstract fun trySelect (Ljava/lang/Object;)Z

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+88-130
Large diffs are not rendered by default.

kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.internal
@@ -66,7 +66,7 @@ public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): Abst
6666
public expect abstract class AbstractAtomicDesc : AtomicDesc {
6767
final override fun prepare(op: AtomicOp<*>): Any?
6868
final override fun complete(op: AtomicOp<*>, failure: Any?)
69-
protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
69+
protected open fun failure(affected: LockFreeLinkedListNode): Any?
7070
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
7171
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
7272
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)

kotlinx-coroutines-core/common/src/selects/Select.kt

+12-19
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,6 @@ public interface SelectInstance<in R> {
110110
*/
111111
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
112112

113-
/**
114-
* Performs action atomically when [isSelected] is `false`.
115-
*/
116-
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
117-
118113
/**
119114
* Returns completion continuation of this select instance.
120115
* This select instance must be _selected_ first.
@@ -129,6 +124,7 @@ public interface SelectInstance<in R> {
129124

130125
/**
131126
* Disposes the specified handle when this instance is selected.
127+
* Note, that [DisposableHandle.dispose] could be called multiple times.
132128
*/
133129
public fun disposeOnSelect(handle: DisposableHandle)
134130
}
@@ -329,16 +325,14 @@ internal class SelectBuilderImpl<in R>(
329325

330326
override fun disposeOnSelect(handle: DisposableHandle) {
331327
val node = DisposeNode(handle)
332-
while (true) { // lock-free loop on state
333-
val state = this.state
334-
if (state === this) {
335-
if (addLastIf(node, { this.state === this }))
336-
return
337-
} else { // already selected
338-
handle.dispose()
339-
return
340-
}
328+
// check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
329+
if (!isSelected) {
330+
addLast(node) // add handle to list
331+
// double-check node after adding
332+
if (!isSelected) return // all ok - still not selected
341333
}
334+
// already selected
335+
handle.dispose()
342336
}
343337

344338
private fun doAfterSelect() {
@@ -368,12 +362,11 @@ internal class SelectBuilderImpl<in R>(
368362
}
369363
}
370364

371-
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
372-
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
365+
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
366+
AtomicSelectOp(desc).perform(null)
373367

374368
private inner class AtomicSelectOp(
375-
@JvmField val desc: AtomicDesc,
376-
@JvmField val select: Boolean
369+
@JvmField val desc: AtomicDesc
377370
) : AtomicOp<Any?>() {
378371
override fun prepare(affected: Any?): Any? {
379372
// only originator of operation makes preparation move of installing descriptor into this selector's state
@@ -405,7 +398,7 @@ internal class SelectBuilderImpl<in R>(
405398
}
406399

407400
private fun completeSelect(failure: Any?) {
408-
val selectSuccess = select && failure == null
401+
val selectSuccess = failure == null
409402
val update = if (selectSuccess) null else this@SelectBuilderImpl
410403
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
411404
if (selectSuccess)

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+5-23
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,11 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
246246
}
247247
is LockedQueue -> {
248248
check(state.owner !== owner) { "Already locked by $owner" }
249-
val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block)
250-
val failure = select.performAtomicIfNotSelected(enqueueOp)
251-
when {
252-
failure == null -> { // successfully enqueued
253-
select.disposeOnSelect(enqueueOp.node)
254-
return
255-
}
256-
failure === ALREADY_SELECTED -> return // already selected -- bail out
257-
failure === ENQUEUE_FAIL -> {} // retry
258-
else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure")
249+
val node = LockSelect(owner, this, select, block)
250+
if (state.addLastIf(node) { _state.value === state }) {
251+
// successfully enqueued
252+
select.disposeOnSelect(node)
253+
return
259254
}
260255
}
261256
is OpDescriptor -> state.perform(this) // help
@@ -291,19 +286,6 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
291286
}
292287
}
293288

294-
private class TryEnqueueLockDesc<R>(
295-
@JvmField val mutex: MutexImpl,
296-
owner: Any?,
297-
queue: LockedQueue,
298-
select: SelectInstance<R>,
299-
block: suspend (Mutex) -> R
300-
) : AddLastDesc<LockSelect<R>>(queue, LockSelect(owner, mutex, select, block)) {
301-
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
302-
if (mutex._state.value !== queue) return ENQUEUE_FAIL
303-
return super.onPrepare(affected, next)
304-
}
305-
}
306-
307289
public override fun holdsLock(owner: Any) =
308290
_state.value.let { state ->
309291
when (state) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.selects
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlin.test.*
10+
11+
class SelectLinkedListChannelTest : TestBase() {
12+
@Test
13+
fun testSelectSendWhenClosed() = runTest {
14+
expect(1)
15+
val c = Channel<Int>(Channel.UNLIMITED)
16+
c.send(1) // enqueue buffered element
17+
c.close() // then close
18+
assertFailsWith<ClosedSendChannelException> {
19+
// select sender should fail
20+
expect(2)
21+
select {
22+
c.onSend(2) {
23+
expectUnreached()
24+
}
25+
}
26+
}
27+
finish(3)
28+
}
29+
}

kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt

+23
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,29 @@ class SelectRendezvousChannelTest : TestBase() {
403403
finish(10)
404404
}
405405

406+
@Test
407+
fun testSelectSendWhenClosed() = runTest {
408+
expect(1)
409+
val c = Channel<Int>(Channel.RENDEZVOUS)
410+
val sender = launch(start = CoroutineStart.UNDISPATCHED) {
411+
expect(2)
412+
c.send(1) // enqueue sender
413+
expectUnreached()
414+
}
415+
c.close() // then close
416+
assertFailsWith<ClosedSendChannelException> {
417+
// select sender should fail
418+
expect(3)
419+
select {
420+
c.onSend(2) {
421+
expectUnreached()
422+
}
423+
}
424+
}
425+
sender.cancel()
426+
finish(4)
427+
}
428+
406429
// only for debugging
407430
internal fun <R> SelectBuilder<R>.default(block: suspend () -> R) {
408431
this as SelectBuilderImpl // type assertion

kotlinx-coroutines-core/js/src/internal/LinkedList.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
@file:Suppress("unused")
@@ -129,13 +129,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() {
129129
actual final override fun prepare(op: AtomicOp<*>): Any? {
130130
val affected = affectedNode
131131
val next = affected._next
132-
val failure = failure(affected, next)
132+
val failure = failure(affected)
133133
if (failure != null) return failure
134134
return onPrepare(affected, next)
135135
}
136136

137137
actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete()
138-
protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default
138+
protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default
139139
protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds
140140
protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
141141
}

kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt

+3-25
Original file line numberDiff line numberDiff line change
@@ -254,26 +254,6 @@ public actual open class LockFreeLinkedListNode {
254254
finishRemove(removed.ref)
255255
}
256256

257-
public open fun describeRemove() : AtomicDesc? {
258-
if (isRemoved) return null // fast path if was already removed
259-
return object : AbstractAtomicDesc() {
260-
private val _originalNext = atomic<Node?>(null)
261-
override val affectedNode: Node? get() = this@LockFreeLinkedListNode
262-
override val originalNext get() = _originalNext.value
263-
override fun failure(affected: Node, next: Any): Any? =
264-
if (next is Removed) ALREADY_REMOVED else null
265-
override fun onPrepare(affected: Node, next: Node): Any? {
266-
// Note: onPrepare must use CAS to make sure the stale invocation is not
267-
// going to overwrite the previous decision on successful preparation.
268-
// Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
269-
_originalNext.compareAndSet(null, next)
270-
return null // always success
271-
}
272-
override fun updatedNext(affected: Node, next: Node) = next.removed()
273-
override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
274-
}
275-
}
276-
277257
public actual fun removeFirstOrNull(): Node? {
278258
while (true) { // try to linearize
279259
val first = next as Node
@@ -376,7 +356,7 @@ public actual open class LockFreeLinkedListNode {
376356
final override val originalNext: Node? get() = _originalNext.value
377357

378358
// check node predicates here, must signal failure if affect is not of type T
379-
protected override fun failure(affected: Node, next: Any): Any? =
359+
protected override fun failure(affected: Node): Any? =
380360
if (affected === queue) LIST_EMPTY else null
381361

382362
// validate the resulting node (return false if it should be deleted)
@@ -408,7 +388,7 @@ public actual open class LockFreeLinkedListNode {
408388
protected abstract val affectedNode: Node?
409389
protected abstract val originalNext: Node?
410390
protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!!
411-
protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
391+
protected open fun failure(affected: Node): Any? = null // next: Node | Removed
412392
protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
413393
protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure
414394
protected abstract fun updatedNext(affected: Node, next: Node): Any
@@ -460,7 +440,7 @@ public actual open class LockFreeLinkedListNode {
460440
continue // and retry
461441
}
462442
// next: Node | Removed
463-
val failure = failure(affected, next)
443+
val failure = failure(affected)
464444
if (failure != null) return failure // signal failure
465445
if (retry(affected, next)) continue // retry operation
466446
val prepareOp = PrepareOp(next as Node, op as AtomicOp<Node>, this)
@@ -684,8 +664,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
684664
// just a defensive programming -- makes sure that list head sentinel is never removed
685665
public actual final override fun remove(): Boolean = throw UnsupportedOperationException()
686666

687-
public final override fun describeRemove(): Nothing = throw UnsupportedOperationException()
688-
689667
internal fun validate() {
690668
var prev: Node = this
691669
var cur: Node = next as Node

kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListTest.kt

+1-34
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
44

55
package kotlinx.coroutines.internal
@@ -47,29 +47,6 @@ class LockFreeLinkedListTest {
4747
assertContents(list, 1, 3)
4848
}
4949

50-
@Test
51-
fun testRemoveTwoAtomic() {
52-
val list = LockFreeLinkedListHead()
53-
val n1 = IntNode(1).apply { list.addLast(this) }
54-
val n2 = IntNode(2).apply { list.addLast(this) }
55-
assertContents(list, 1, 2)
56-
assertFalse(n1.isRemoved)
57-
assertFalse(n2.isRemoved)
58-
val remove1Desc = n1.describeRemove()!!
59-
val remove2Desc = n2.describeRemove()!!
60-
val operation = object : AtomicOp<Any?>() {
61-
override fun prepare(affected: Any?): Any? = remove1Desc.prepare(this) ?: remove2Desc.prepare(this)
62-
override fun complete(affected: Any?, failure: Any?) {
63-
remove1Desc.complete(this, failure)
64-
remove2Desc.complete(this, failure)
65-
}
66-
}
67-
assertTrue(operation.perform(null) == null)
68-
assertTrue(n1.isRemoved)
69-
assertTrue(n2.isRemoved)
70-
assertContents(list)
71-
}
72-
7350
@Test
7451
fun testAtomicOpsSingle() {
7552
val list = LockFreeLinkedListHead()
@@ -82,16 +59,6 @@ class LockFreeLinkedListTest {
8259
assertContents(list, 1, 2, 3)
8360
val n4 = IntNode(4).also { single(list.describeAddLast(it)) }
8461
assertContents(list, 1, 2, 3, 4)
85-
single(n3.describeRemove()!!)
86-
assertContents(list, 1, 2, 4)
87-
assertTrue(n3.describeRemove() == null)
88-
single(list.describeRemoveFirst())
89-
assertContents(list, 2, 4)
90-
assertTrue(n1.describeRemove() == null)
91-
assertTrue(n2.remove())
92-
assertContents(list, 4)
93-
assertTrue(n4.remove())
94-
assertContents(list)
9562
}
9663

9764
private fun single(part: AtomicDesc) {

0 commit comments

Comments
 (0)