Skip to content

Commit d89c41e

Browse files
committed
Optimize select expression registration phase
There is no need for multi-word atomic performAtomicIfNotSelected operation when enqueuing select node for operation. We can simply enqueue (addLast) xxxSelect node (SendSelect, ReceiveSelect, LockSelect). If the coroutine that rendezvous with this node finds out that the select expression was already selected, then it'll try again. * Removed SelectInstance.performAtomicIfNotSelected function * Removed Mutex.TryEnqueueLockDesc class, simplified onLock * Removed AbstractSendChannel.TryEnqueueSendDesc class, simpler onSend * Removed AbstractChannel.TryEnqueueReceiveDesc class, simpler onReceive * Simplified SelectInstance.disposeOnSelect. It does not have to do atomic addLastIf operation. Can do a simple addLast.
1 parent 64902a9 commit d89c41e

File tree

5 files changed

+125
-150
lines changed

5 files changed

+125
-150
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

-2
Original file line numberDiff line numberDiff line change
@@ -1045,7 +1045,6 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10451045
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
10461046
public fun isSelected ()Z
10471047
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
1048-
public fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10491048
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10501049
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10511050
public fun resumeWith (Ljava/lang/Object;)V
@@ -1068,7 +1067,6 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10681067
public abstract fun disposeOnSelect (Lkotlinx/coroutines/DisposableHandle;)V
10691068
public abstract fun getCompletion ()Lkotlin/coroutines/Continuation;
10701069
public abstract fun isSelected ()Z
1071-
public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10721070
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10731071
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
10741072
public abstract fun trySelect (Ljava/lang/Object;)Z

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+49-106
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
9696
* @suppress **This is unstable API and it is subject to change.**
9797
*/
9898
protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
99-
queue.addLastIfPrev(SendBuffered(element), { prev ->
99+
queue.addLastIfPrev(SendBuffered(element)) { prev ->
100100
if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
101101
true
102-
})
102+
}
103103
return null
104104
}
105105

@@ -170,8 +170,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
170170
private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
171171
val send = SendElement(element, cont)
172172
loop@ while (true) {
173-
val enqueueResult = enqueueSend(send)
174-
when (enqueueResult) {
173+
when (val enqueueResult = enqueueSend(send)) {
175174
null -> { // enqueued successfully
176175
cont.removeOnCancellation(send)
177176
return@sc
@@ -206,12 +205,12 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
206205
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
207206
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
208207
*/
209-
private fun enqueueSend(send: SendElement): Any? {
208+
private fun enqueueSend(send: Send): Any? {
210209
if (isBufferAlwaysFull) {
211-
queue.addLastIfPrev(send, { prev ->
210+
queue.addLastIfPrev(send) { prev ->
212211
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
213212
true
214-
})
213+
}
215214
} else {
216215
if (!queue.addLastIfPrevAndIf(send, { prev ->
217216
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
@@ -346,30 +345,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
346345
}
347346
}
348347

349-
private inner class TryEnqueueSendDesc<R>(
350-
element: E,
351-
select: SelectInstance<R>,
352-
block: suspend (SendChannel<E>) -> R
353-
) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
354-
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
355-
if (affected is ReceiveOrClosed<*>) {
356-
return affected as? Closed<*> ?: ENQUEUE_FAILED
357-
}
358-
return null
359-
}
360-
361-
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
362-
if (!isBufferFull) return ENQUEUE_FAILED
363-
return super.onPrepare(affected, next)
364-
}
365-
366-
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
367-
super.finishOnSuccess(affected, next)
368-
// we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
369-
node.disposeOnSelect()
370-
}
371-
}
372-
373348
final override val onSend: SelectClause2<E, SendChannel<E>>
374349
get() = object : SelectClause2<E, SendChannel<E>> {
375350
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
@@ -381,27 +356,30 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
381356
while (true) {
382357
if (select.isSelected) return
383358
if (full) {
384-
val enqueueOp = TryEnqueueSendDesc(element, select, block)
385-
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
386-
when {
387-
enqueueResult === ALREADY_SELECTED -> return
388-
enqueueResult === ENQUEUE_FAILED -> {} // retry
389-
enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
390-
else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
391-
}
392-
} else {
393-
val offerResult = offerSelectInternal(element, select)
394-
when {
395-
offerResult === ALREADY_SELECTED -> return
396-
offerResult === OFFER_FAILED -> {} // retry
397-
offerResult === OFFER_SUCCESS -> {
398-
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
359+
val node = SendSelect(element, this, select, block)
360+
when (val enqueueResult = enqueueSend(node)) {
361+
null -> { // enqueued successfully
362+
select.disposeOnSelect(node)
399363
return
400364
}
401-
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
402-
else -> error("offerSelectInternal returned $offerResult")
365+
is Closed<*> -> {
366+
helpClose(enqueueResult)
367+
throw recoverStackTrace(enqueueResult.sendException)
368+
}
403369
}
404370
}
371+
// hm... receiver is waiting or buffer is not full. try to offer
372+
val offerResult = offerSelectInternal(element, select)
373+
when {
374+
offerResult === ALREADY_SELECTED -> return
375+
offerResult === OFFER_FAILED -> {} // retry
376+
offerResult === OFFER_SUCCESS -> {
377+
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
378+
return
379+
}
380+
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
381+
else -> error("offerSelectInternal returned $offerResult")
382+
}
405383
}
406384
}
407385

@@ -443,7 +421,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
443421
@JvmField val channel: SendChannel<E>,
444422
@JvmField val select: SelectInstance<R>,
445423
@JvmField val block: suspend (SendChannel<E>) -> R
446-
) : LockFreeLinkedListNode(), Send, DisposableHandle {
424+
) : Send(), DisposableHandle {
447425
override fun tryResumeSend(idempotent: Any?): Any? =
448426
if (select.trySelect(idempotent)) SELECT_STARTED else null
449427

@@ -452,11 +430,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
452430
block.startCoroutine(receiver = channel, completion = select.completion)
453431
}
454432

455-
fun disposeOnSelect() {
456-
select.disposeOnSelect(this)
457-
}
458-
459-
override fun dispose() {
433+
override fun dispose() { // invoked on select completion
460434
remove()
461435
}
462436

@@ -470,7 +444,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
470444

471445
internal class SendBuffered<out E>(
472446
@JvmField val element: E
473-
) : LockFreeLinkedListNode(), Send {
447+
) : Send() {
474448
override val pollResult: Any? get() = element
475449
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
476450
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
@@ -578,7 +552,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
578552

579553
private fun enqueueReceive(receive: Receive<E>): Boolean {
580554
val result = if (isBufferAlwaysEmpty)
581-
queue.addLastIfPrev(receive, { it !is Send }) else
555+
queue.addLastIfPrev(receive) { it !is Send } else
582556
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
583557
if (result) onReceiveEnqueued()
584558
return result
@@ -674,30 +648,6 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
674648
}
675649
}
676650

677-
private inner class TryEnqueueReceiveDesc<E, R>(
678-
select: SelectInstance<R>,
679-
block: suspend (Any?) -> R,
680-
receiveMode: Int
681-
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, receiveMode)) {
682-
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
683-
if (affected is Send) return ENQUEUE_FAILED
684-
return null
685-
}
686-
687-
override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
688-
if (!isBufferEmpty) return ENQUEUE_FAILED
689-
return super.onPrepare(affected, next)
690-
}
691-
692-
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
693-
super.finishOnSuccess(affected, next)
694-
// notify the there is one more receiver
695-
onReceiveEnqueued()
696-
// we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
697-
node.removeOnSelectCompletion()
698-
}
699-
}
700-
701651
final override val onReceive: SelectClause1<E>
702652
get() = object : SelectClause1<E> {
703653
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
@@ -710,7 +660,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
710660
while (true) {
711661
if (select.isSelected) return
712662
if (isEmpty) {
713-
if (registerEnqueueDesc(select, block, RECEIVE_THROWS_ON_CLOSE)) return
663+
if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_THROWS_ON_CLOSE)) return
714664
} else {
715665
val pollResult = pollSelectInternal(select)
716666
when {
@@ -738,7 +688,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
738688
while (true) {
739689
if (select.isSelected) return
740690
if (isEmpty) {
741-
if (registerEnqueueDesc(select, block, RECEIVE_NULL_ON_CLOSE)) return
691+
if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_NULL_ON_CLOSE)) return
742692
} else {
743693
val pollResult = pollSelectInternal(select)
744694
when {
@@ -775,7 +725,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
775725
while (true) {
776726
if (select.isSelected) return
777727
if (isEmpty) {
778-
if (registerEnqueueDesc(select, block, RECEIVE_RESULT)) return
728+
if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_RESULT)) return
779729
} else {
780730
val pollResult = pollSelectInternal(select)
781731
when {
@@ -794,18 +744,14 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
794744
}
795745
}
796746

797-
private fun <R, E> registerEnqueueDesc(
798-
select: SelectInstance<R>, block: suspend (E) -> R,
747+
private fun <R> enqueueReceiveSelect(
748+
select: SelectInstance<R>, block: suspend (Any?) -> R,
799749
receiveMode: Int
800750
): Boolean {
801-
@Suppress("UNCHECKED_CAST")
802-
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, receiveMode)
803-
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
804-
return when {
805-
enqueueResult === ALREADY_SELECTED -> true
806-
enqueueResult === ENQUEUE_FAILED -> false // retry
807-
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
808-
}
751+
val node = ReceiveSelect(this, select, block, receiveMode)
752+
val result = enqueueReceive(node)
753+
if (result) select.disposeOnSelect(node)
754+
return result
809755
}
810756

811757
// ------ protected ------
@@ -960,7 +906,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
960906
override fun toString(): String = "ReceiveHasNext[$cont]"
961907
}
962908

963-
private inner class ReceiveSelect<R, in E>(
909+
private class ReceiveSelect<R, E>(
910+
@JvmField val channel: AbstractChannel<E>,
964911
@JvmField val select: SelectInstance<R>,
965912
@JvmField val block: suspend (Any?) -> R,
966913
@JvmField val receiveMode: Int
@@ -987,13 +934,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
987934
}
988935
}
989936

990-
fun removeOnSelectCompletion() {
991-
select.disposeOnSelect(this)
992-
}
993-
994937
override fun dispose() { // invoked on select completion
995938
if (remove())
996-
onReceiveDequeued() // notify cancellation of receive
939+
channel.onReceiveDequeued() // notify cancellation of receive
997940
}
998941

999942
override fun toString(): String = "ReceiveSelect[$select,receiveMode=$receiveMode]"
@@ -1051,11 +994,11 @@ internal typealias Handler = (Throwable?) -> Unit
1051994
/**
1052995
* Represents sending waiter in the queue.
1053996
*/
1054-
internal interface Send {
1055-
val pollResult: Any? // E | Closed
1056-
fun tryResumeSend(idempotent: Any?): Any?
1057-
fun completeResumeSend(token: Any)
1058-
fun resumeSendClosed(closed: Closed<*>)
997+
internal abstract class Send : LockFreeLinkedListNode() {
998+
abstract val pollResult: Any? // E | Closed
999+
abstract fun tryResumeSend(idempotent: Any?): Any?
1000+
abstract fun completeResumeSend(token: Any)
1001+
abstract fun resumeSendClosed(closed: Closed<*>)
10591002
}
10601003

10611004
/**
@@ -1074,7 +1017,7 @@ internal interface ReceiveOrClosed<in E> {
10741017
internal class SendElement(
10751018
override val pollResult: Any?,
10761019
@JvmField val cont: CancellableContinuation<Unit>
1077-
) : LockFreeLinkedListNode(), Send {
1020+
) : Send() {
10781021
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
10791022
override fun completeResumeSend(token: Any) = cont.completeResume(token)
10801023
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
@@ -1086,7 +1029,7 @@ internal class SendElement(
10861029
*/
10871030
internal class Closed<in E>(
10881031
@JvmField val closeCause: Throwable?
1089-
) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
1032+
) : Send(), ReceiveOrClosed<E> {
10901033
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
10911034
val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
10921035

kotlinx-coroutines-core/common/src/selects/Select.kt

+12-19
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,6 @@ public interface SelectInstance<in R> {
110110
*/
111111
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
112112

113-
/**
114-
* Performs action atomically when [isSelected] is `false`.
115-
*/
116-
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
117-
118113
/**
119114
* Returns completion continuation of this select instance.
120115
* This select instance must be _selected_ first.
@@ -129,6 +124,7 @@ public interface SelectInstance<in R> {
129124

130125
/**
131126
* Disposes the specified handle when this instance is selected.
127+
* Note, that [DisposableHandle.dispose] could be called multiple times.
132128
*/
133129
public fun disposeOnSelect(handle: DisposableHandle)
134130
}
@@ -329,16 +325,14 @@ internal class SelectBuilderImpl<in R>(
329325

330326
override fun disposeOnSelect(handle: DisposableHandle) {
331327
val node = DisposeNode(handle)
332-
while (true) { // lock-free loop on state
333-
val state = this.state
334-
if (state === this) {
335-
if (addLastIf(node, { this.state === this }))
336-
return
337-
} else { // already selected
338-
handle.dispose()
339-
return
340-
}
328+
// check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
329+
if (!isSelected) {
330+
addLast(node) // add handle to list
331+
// double-check node after adding
332+
if (!isSelected) return // all ok - still not selected
341333
}
334+
// already selected
335+
handle.dispose()
342336
}
343337

344338
private fun doAfterSelect() {
@@ -368,12 +362,11 @@ internal class SelectBuilderImpl<in R>(
368362
}
369363
}
370364

371-
override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
372-
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
365+
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
366+
AtomicSelectOp(desc).perform(null)
373367

374368
private inner class AtomicSelectOp(
375-
@JvmField val desc: AtomicDesc,
376-
@JvmField val select: Boolean
369+
@JvmField val desc: AtomicDesc
377370
) : AtomicOp<Any?>() {
378371
override fun prepare(affected: Any?): Any? {
379372
// only originator of operation makes preparation move of installing descriptor into this selector's state
@@ -405,7 +398,7 @@ internal class SelectBuilderImpl<in R>(
405398
}
406399

407400
private fun completeSelect(failure: Any?) {
408-
val selectSuccess = select && failure == null
401+
val selectSuccess = failure == null
409402
val update = if (selectSuccess) null else this@SelectBuilderImpl
410403
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
411404
if (selectSuccess)

0 commit comments

Comments
 (0)