Skip to content

Optimize select expression registration phase #1445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,6 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
public fun invoke (Lkotlinx/coroutines/selects/SelectClause2;Lkotlin/jvm/functions/Function2;)V
public fun isSelected ()Z
public fun onTimeout (JLkotlin/jvm/functions/Function1;)V
public fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
Expand All @@ -1068,7 +1067,6 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
public abstract fun disposeOnSelect (Lkotlinx/coroutines/DisposableHandle;)V
public abstract fun getCompletion ()Lkotlin/coroutines/Continuation;
public abstract fun isSelected ()Z
public abstract fun performAtomicIfNotSelected (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
public abstract fun resumeSelectCancellableWithException (Ljava/lang/Throwable;)V
public abstract fun trySelect (Ljava/lang/Object;)Z
Expand Down
155 changes: 49 additions & 106 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
queue.addLastIfPrev(SendBuffered(element), { prev ->
queue.addLastIfPrev(SendBuffered(element)) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
true
})
}
return null
}

Expand Down Expand Up @@ -170,8 +170,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
val send = SendElement(element, cont)
loop@ while (true) {
val enqueueResult = enqueueSend(send)
when (enqueueResult) {
when (val enqueueResult = enqueueSend(send)) {
null -> { // enqueued successfully
cont.removeOnCancellation(send)
return@sc
Expand Down Expand Up @@ -206,12 +205,12 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
*/
private fun enqueueSend(send: SendElement): Any? {
private fun enqueueSend(send: Send): Any? {
if (isBufferAlwaysFull) {
queue.addLastIfPrev(send, { prev ->
queue.addLastIfPrev(send) { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
true
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 220: return ENQUEUE_FAILED

Nobody checks for ENQUEUE_FAILED. It doesn't look like a bug because of the current state machine (null -> success, Closed -> closed, everything else -> retry), but kind of error-prone.

For example, null can become ENQUEUE_SUCCESS and ENQUEUE_FAILED will become null instead. Code flow will be more straightforward and explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've rewritten the corresponding when statements in a more explicit way, checking all possible outcomes and added error on unexpected result.

} else {
if (!queue.addLastIfPrevAndIf(send, { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
Expand Down Expand Up @@ -346,30 +345,6 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
}
}

private inner class TryEnqueueSendDesc<R>(
element: E,
select: SelectInstance<R>,
block: suspend (SendChannel<E>) -> R
) : AddLastDesc<SendSelect<E, R>>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is ReceiveOrClosed<*>) {
return affected as? Closed<*> ?: ENQUEUE_FAILED
}
return null
}

override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
if (!isBufferFull) return ENQUEUE_FAILED
return super.onPrepare(affected, next)
}

override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
super.finishOnSuccess(affected, next)
// we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
node.disposeOnSelect()
}
}

final override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
Expand All @@ -381,27 +356,30 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
while (true) {
if (select.isSelected) return
if (full) {
val enqueueOp = TryEnqueueSendDesc(element, select, block)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
enqueueResult === ALREADY_SELECTED -> return
enqueueResult === ENQUEUE_FAILED -> {} // retry
enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
}
} else {
val offerResult = offerSelectInternal(element, select)
when {
offerResult === ALREADY_SELECTED -> return
offerResult === OFFER_FAILED -> {} // retry
offerResult === OFFER_SUCCESS -> {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
val node = SendSelect(element, this, select, block)
when (val enqueueResult = enqueueSend(node)) {
null -> { // enqueued successfully
select.disposeOnSelect(node)
return
}
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
else -> error("offerSelectInternal returned $offerResult")
is Closed<*> -> {
helpClose(enqueueResult)
throw recoverStackTrace(enqueueResult.sendException)
}
}
}
// hm... receiver is waiting or buffer is not full. try to offer
val offerResult = offerSelectInternal(element, select)
when {
offerResult === ALREADY_SELECTED -> return
offerResult === OFFER_FAILED -> {} // retry
offerResult === OFFER_SUCCESS -> {
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
return
}
offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
else -> error("offerSelectInternal returned $offerResult")
}
}
}

Expand Down Expand Up @@ -443,7 +421,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
@JvmField val channel: SendChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (SendChannel<E>) -> R
) : LockFreeLinkedListNode(), Send, DisposableHandle {
) : Send(), DisposableHandle {
override fun tryResumeSend(idempotent: Any?): Any? =
if (select.trySelect(idempotent)) SELECT_STARTED else null

Expand All @@ -452,11 +430,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
block.startCoroutine(receiver = channel, completion = select.completion)
}

fun disposeOnSelect() {
select.disposeOnSelect(this)
}

override fun dispose() {
override fun dispose() { // invoked on select completion
remove()
}

Expand All @@ -470,7 +444,7 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {

internal class SendBuffered<out E>(
@JvmField val element: E
) : LockFreeLinkedListNode(), Send {
) : Send() {
override val pollResult: Any? get() = element
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
Expand Down Expand Up @@ -578,7 +552,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private fun enqueueReceive(receive: Receive<E>): Boolean {
val result = if (isBufferAlwaysEmpty)
queue.addLastIfPrev(receive, { it !is Send }) else
queue.addLastIfPrev(receive) { it !is Send } else
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
if (result) onReceiveEnqueued()
return result
Expand Down Expand Up @@ -674,30 +648,6 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

private inner class TryEnqueueReceiveDesc<E, R>(
select: SelectInstance<R>,
block: suspend (Any?) -> R,
receiveMode: Int
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, receiveMode)) {
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is Send) return ENQUEUE_FAILED
return null
}

override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
if (!isBufferEmpty) return ENQUEUE_FAILED
return super.onPrepare(affected, next)
}

override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
super.finishOnSuccess(affected, next)
// notify the there is one more receiver
onReceiveEnqueued()
// we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
node.removeOnSelectCompletion()
}
}

final override val onReceive: SelectClause1<E>
get() = object : SelectClause1<E> {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
Expand All @@ -710,7 +660,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
if (registerEnqueueDesc(select, block, RECEIVE_THROWS_ON_CLOSE)) return
if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_THROWS_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
Expand Down Expand Up @@ -738,7 +688,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
if (registerEnqueueDesc(select, block, RECEIVE_NULL_ON_CLOSE)) return
if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_NULL_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
Expand Down Expand Up @@ -775,7 +725,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
if (registerEnqueueDesc(select, block, RECEIVE_RESULT)) return
if (enqueueReceiveSelect(select, block as suspend (Any?) -> R, RECEIVE_RESULT)) return
} else {
val pollResult = pollSelectInternal(select)
when {
Expand All @@ -794,18 +744,14 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

private fun <R, E> registerEnqueueDesc(
select: SelectInstance<R>, block: suspend (E) -> R,
private fun <R> enqueueReceiveSelect(
select: SelectInstance<R>, block: suspend (Any?) -> R,
receiveMode: Int
): Boolean {
@Suppress("UNCHECKED_CAST")
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, receiveMode)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
return when {
enqueueResult === ALREADY_SELECTED -> true
enqueueResult === ENQUEUE_FAILED -> false // retry
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
}
val node = ReceiveSelect(this, select, block, receiveMode)
val result = enqueueReceive(node)
if (result) select.disposeOnSelect(node)
return result
}

// ------ protected ------
Expand Down Expand Up @@ -960,7 +906,8 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
override fun toString(): String = "ReceiveHasNext[$cont]"
}

private inner class ReceiveSelect<R, in E>(
private class ReceiveSelect<R, E>(
@JvmField val channel: AbstractChannel<E>,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
Expand All @@ -987,13 +934,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

fun removeOnSelectCompletion() {
select.disposeOnSelect(this)
}

override fun dispose() { // invoked on select completion
if (remove())
onReceiveDequeued() // notify cancellation of receive
channel.onReceiveDequeued() // notify cancellation of receive
}

override fun toString(): String = "ReceiveSelect[$select,receiveMode=$receiveMode]"
Expand Down Expand Up @@ -1051,11 +994,11 @@ internal typealias Handler = (Throwable?) -> Unit
/**
* Represents sending waiter in the queue.
*/
internal interface Send {
val pollResult: Any? // E | Closed
fun tryResumeSend(idempotent: Any?): Any?
fun completeResumeSend(token: Any)
fun resumeSendClosed(closed: Closed<*>)
internal abstract class Send : LockFreeLinkedListNode() {
abstract val pollResult: Any? // E | Closed
abstract fun tryResumeSend(idempotent: Any?): Any?
abstract fun completeResumeSend(token: Any)
abstract fun resumeSendClosed(closed: Closed<*>)
}

/**
Expand All @@ -1074,7 +1017,7 @@ internal interface ReceiveOrClosed<in E> {
internal class SendElement(
override val pollResult: Any?,
@JvmField val cont: CancellableContinuation<Unit>
) : LockFreeLinkedListNode(), Send {
) : Send() {
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
override fun completeResumeSend(token: Any) = cont.completeResume(token)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
Expand All @@ -1086,7 +1029,7 @@ internal class SendElement(
*/
internal class Closed<in E>(
@JvmField val closeCause: Throwable?
) : LockFreeLinkedListNode(), Send, ReceiveOrClosed<E> {
) : Send(), ReceiveOrClosed<E> {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)

Expand Down
31 changes: 12 additions & 19 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,6 @@ public interface SelectInstance<in R> {
*/
public fun performAtomicTrySelect(desc: AtomicDesc): Any?

/**
* Performs action atomically when [isSelected] is `false`.
*/
public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?

/**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
Expand All @@ -129,6 +124,7 @@ public interface SelectInstance<in R> {

/**
* Disposes the specified handle when this instance is selected.
* Note, that [DisposableHandle.dispose] could be called multiple times.
*/
public fun disposeOnSelect(handle: DisposableHandle)
}
Expand Down Expand Up @@ -329,16 +325,14 @@ internal class SelectBuilderImpl<in R>(

override fun disposeOnSelect(handle: DisposableHandle) {
val node = DisposeNode(handle)
while (true) { // lock-free loop on state
val state = this.state
if (state === this) {
if (addLastIf(node, { this.state === this }))
return
} else { // already selected
handle.dispose()
return
}
// check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
if (!isSelected) {
addLast(node) // add handle to list
// double-check node after adding
if (!isSelected) return // all ok - still not selected
}
// already selected
handle.dispose()
}

private fun doAfterSelect() {
Expand Down Expand Up @@ -368,12 +362,11 @@ internal class SelectBuilderImpl<in R>(
}
}

override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
AtomicSelectOp(desc).perform(null)

private inner class AtomicSelectOp(
@JvmField val desc: AtomicDesc,
@JvmField val select: Boolean
@JvmField val desc: AtomicDesc
) : AtomicOp<Any?>() {
override fun prepare(affected: Any?): Any? {
// only originator of operation makes preparation move of installing descriptor into this selector's state
Expand Down Expand Up @@ -405,7 +398,7 @@ internal class SelectBuilderImpl<in R>(
}

private fun completeSelect(failure: Any?) {
val selectSuccess = select && failure == null
val selectSuccess = failure == null
val update = if (selectSuccess) null else this@SelectBuilderImpl
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
if (selectSuccess)
Expand Down
Loading