Skip to content

Commit fef6482

Browse files
committed
Multithreaded support in select expression
Fixes #1764
1 parent a4c3a48 commit fef6482

File tree

15 files changed

+258
-67
lines changed

15 files changed

+258
-67
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+8-4
Original file line numberDiff line numberDiff line change
@@ -1073,8 +1073,8 @@ public final class kotlinx/coroutines/selects/SelectBuilderImpl : kotlinx/corout
10731073
public fun resumeSelectWithException (Ljava/lang/Throwable;)V
10741074
public fun resumeWith (Ljava/lang/Object;)V
10751075
public fun toString ()Ljava/lang/String;
1076-
public fun trySelect ()Z
1077-
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1076+
public fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1077+
public fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
10781078
}
10791079

10801080
public abstract interface class kotlinx/coroutines/selects/SelectClause0 {
@@ -1095,8 +1095,12 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
10951095
public abstract fun isSelected ()Z
10961096
public abstract fun performAtomicTrySelect (Lkotlinx/coroutines/internal/AtomicDesc;)Ljava/lang/Object;
10971097
public abstract fun resumeSelectWithException (Ljava/lang/Throwable;)V
1098-
public abstract fun trySelect ()Z
1099-
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;)Ljava/lang/Object;
1098+
public abstract fun trySelect (Lkotlin/jvm/functions/Function0;)Z
1099+
public abstract fun trySelectOther (Lkotlinx/coroutines/internal/LockFreeLinkedListNode$PrepareOp;Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
1100+
}
1101+
1102+
public final class kotlinx/coroutines/selects/SelectInstance$DefaultImpls {
1103+
public static synthetic fun trySelect$default (Lkotlinx/coroutines/selects/SelectInstance;Lkotlin/jvm/functions/Function0;ILjava/lang/Object;)Z
11001104
}
11011105

11021106
public final class kotlinx/coroutines/selects/SelectKt {

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,6 @@ public abstract class AbstractCoroutine<in T>(
138138
*/
139139
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
140140
initParentJob()
141-
startCoroutine(start, this, receiver, block)
141+
startCoroutine(start, receiver, this, block)
142142
}
143143
}

kotlinx-coroutines-core/common/src/Builders.common.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,13 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
174174

175175
internal fun <T, R> startCoroutineImpl(
176176
start: CoroutineStart,
177-
coroutine: AbstractCoroutine<T>,
178177
receiver: R,
178+
completion: Continuation<T>,
179179
block: suspend R.() -> T
180180
) = when (start) {
181-
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, coroutine)
182-
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, coroutine)
183-
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, coroutine)
181+
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
182+
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
183+
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
184184
CoroutineStart.LAZY -> Unit // will start lazily
185185
}
186186

@@ -189,8 +189,8 @@ internal fun <T, R> startCoroutineImpl(
189189
// todo: impl a separate startCoroutineCancellable as a fast-path for startCoroutine(DEFAULT, ...)
190190
internal expect fun <T, R> startCoroutine(
191191
start: CoroutineStart,
192-
coroutine: AbstractCoroutine<T>,
193192
receiver: R,
193+
completion: Continuation<T>,
194194
block: suspend R.() -> T
195195
)
196196

kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt

+56-22
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,10 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
382382
select.disposeOnSelect(node)
383383
return
384384
}
385-
enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
385+
enqueueResult is Closed<*> -> {
386+
node.dispose()
387+
throw recoverStackTrace(helpCloseAndGetSendException(enqueueResult))
388+
}
386389
enqueueResult === ENQUEUE_FAILED -> {} // try to offer
387390
enqueueResult is Receive<*> -> {} // try to offer
388391
else -> error("enqueueSend returned $enqueueResult ")
@@ -448,16 +451,19 @@ internal abstract class AbstractSendChannel<E> : SendChannel<E> {
448451
override val pollResult: Any?,
449452
@JvmField val channel: AbstractSendChannel<E>,
450453
@JvmField val select: SelectInstance<R>,
451-
@JvmField val block: suspend (SendChannel<E>) -> R
454+
block: suspend (SendChannel<E>) -> R
452455
) : Send(), DisposableHandle {
456+
@JvmField val block: suspend (SendChannel<E>) -> R = block.asShareable()
457+
453458
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
454-
select.trySelectOther(otherOp) as Symbol? // must return symbol
459+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?
455460

456461
override fun completeResumeSend() {
457-
block.startCoroutine(receiver = channel, completion = select.completion)
462+
startCoroutine(CoroutineStart.ATOMIC, channel, select.completion, block)
458463
}
459464

460465
override fun dispose() { // invoked on select completion
466+
block.shareableDispose(useIt = false)
461467
remove()
462468
}
463469

@@ -773,7 +779,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
773779
): Boolean {
774780
val node = ReceiveSelect(this, select, block, receiveMode)
775781
val result = enqueueReceive(node)
776-
if (result) select.disposeOnSelect(node)
782+
if (result) {
783+
select.disposeOnSelect(node)
784+
} else {
785+
node.dispose()
786+
}
777787
return result
778788
}
779789

@@ -871,26 +881,31 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
871881
}
872882

873883
private class ReceiveElement<in E>(
874-
@JvmField val cont: CancellableContinuation<Any?>,
884+
cont: CancellableContinuation<Any?>,
875885
@JvmField val receiveMode: Int
876886
) : Receive<E>() {
887+
private val _cont = atomic<CancellableContinuation<Any?>?>(cont)
888+
877889
fun resumeValue(value: E): Any? = when (receiveMode) {
878890
RECEIVE_RESULT -> ValueOrClosed.value(value)
879891
else -> value
880892
}
881893

882894
@Suppress("IMPLICIT_CAST_TO_ANY")
883895
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
884-
val token = cont.tryResume(resumeValue(value), otherOp?.desc) ?: return null
896+
val token = _cont.value?.tryResume(resumeValue(value), otherOp?.desc) ?: return null
885897
assert { token === RESUME_TOKEN } // the only other possible result
886898
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
887899
otherOp?.finishPrepare()
888900
return RESUME_TOKEN
889901
}
890902

891-
override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
903+
override fun completeResumeReceive(value: E) {
904+
_cont.getAndSet(null)!!.completeResume(RESUME_TOKEN)
905+
}
892906

893907
override fun resumeReceiveClosed(closed: Closed<*>) {
908+
val cont = _cont.getAndSet(null)!!
894909
when {
895910
receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
896911
receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
@@ -902,10 +917,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
902917

903918
private class ReceiveHasNext<E>(
904919
@JvmField val iterator: Itr<E>,
905-
@JvmField val cont: CancellableContinuation<Boolean>
920+
cont: CancellableContinuation<Boolean>
906921
) : Receive<E>() {
922+
private val _cont = atomic<CancellableContinuation<Boolean>?>(cont)
923+
907924
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
908-
val token = cont.tryResume(true, otherOp?.desc) ?: return null
925+
val token = _cont.value?.tryResume(true, otherOp?.desc) ?: return null
909926
assert { token === RESUME_TOKEN } // the only other possible result
910927
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
911928
otherOp?.finishPrepare()
@@ -918,10 +935,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
918935
but completeResumeReceive is called once so we set iterator result here.
919936
*/
920937
iterator.result = value
921-
cont.completeResume(RESUME_TOKEN)
938+
_cont.getAndSet(null)!!.completeResume(RESUME_TOKEN)
922939
}
923940

924941
override fun resumeReceiveClosed(closed: Closed<*>) {
942+
val cont = _cont.getAndSet(null)!!
925943
val token = if (closed.closeCause == null) {
926944
cont.tryResume(false)
927945
} else {
@@ -938,31 +956,38 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
938956
private class ReceiveSelect<R, E>(
939957
@JvmField val channel: AbstractChannel<E>,
940958
@JvmField val select: SelectInstance<R>,
941-
@JvmField val block: suspend (Any?) -> R,
959+
block: suspend (Any?) -> R,
942960
@JvmField val receiveMode: Int
943961
) : Receive<E>(), DisposableHandle {
962+
@JvmField val block: suspend (Any?) -> R = block.asShareable() // captured variables in this block need screening
963+
944964
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
945-
select.trySelectOther(otherOp) as Symbol?
965+
select.trySelectOther(otherOp, onSelect = block::shareableWillBeUsed) as Symbol?
946966

947967
@Suppress("UNCHECKED_CAST")
948968
override fun completeResumeReceive(value: E) {
949-
block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
969+
startCoroutine(CoroutineStart.ATOMIC, if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion, block)
950970
}
951971

952972
override fun resumeReceiveClosed(closed: Closed<*>) {
953-
if (!select.trySelect()) return
973+
if (!select.trySelect(onSelect = block::shareableWillBeUsed)) return
954974
when (receiveMode) {
955-
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
956-
RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
975+
RECEIVE_THROWS_ON_CLOSE -> {
976+
block.shareableDispose(useIt = true)
977+
select.resumeSelectWithException(closed.receiveException)
978+
}
979+
RECEIVE_RESULT -> startCoroutine(CoroutineStart.ATOMIC, ValueOrClosed.closed<R>(closed.closeCause), select.completion, block)
957980
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
958-
block.startCoroutine(null, select.completion)
981+
startCoroutine(CoroutineStart.ATOMIC, null, select.completion, block)
959982
} else {
983+
block.shareableDispose(useIt = true)
960984
select.resumeSelectWithException(closed.receiveException)
961985
}
962986
}
963987
}
964988

965989
override fun dispose() { // invoked on select completion
990+
block.shareableDispose(useIt = false)
966991
if (remove())
967992
channel.onReceiveDequeued() // notify cancellation of receive
968993
}
@@ -1031,17 +1056,26 @@ internal interface ReceiveOrClosed<in E> {
10311056
@Suppress("UNCHECKED_CAST")
10321057
internal class SendElement(
10331058
override val pollResult: Any?,
1034-
@JvmField val cont: CancellableContinuation<Unit>
1059+
cont: CancellableContinuation<Unit>
10351060
) : Send() {
1061+
private val _cont = atomic<CancellableContinuation<Unit>?>(cont)
1062+
10361063
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1037-
val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1064+
val token = _cont.value?.tryResume(Unit, otherOp?.desc) ?: return null
10381065
assert { token === RESUME_TOKEN } // the only other possible result
10391066
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
10401067
otherOp?.finishPrepare() // finish preparations
10411068
return RESUME_TOKEN
10421069
}
1043-
override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1044-
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1070+
1071+
override fun completeResumeSend() {
1072+
_cont.getAndSet(null)!!.completeResume(RESUME_TOKEN)
1073+
}
1074+
1075+
override fun resumeSendClosed(closed: Closed<*>) {
1076+
_cont.getAndSet(null)!!.resumeWithException(closed.sendException)
1077+
}
1078+
10451079
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
10461080
}
10471081

kotlinx-coroutines-core/common/src/internal/Sharing.common.kt

+7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@ internal expect fun <T> Continuation<T>.asLocalOrNull() : Continuation<T>?
1717
internal expect fun <T> Continuation<T>.asLocalOrNullIfNotUsed() : Continuation<T>?
1818
internal expect fun <T> Continuation<T>.useLocal() : Continuation<T>
1919
internal expect fun <T> Continuation<T>.shareableInterceptedResumeCancellableWith(result: Result<T>)
20+
internal expect fun <T> Continuation<T>.shareableInterceptedResumeWith(result: Result<T>)
21+
internal expect fun <T> Continuation<T>.shareableDispose()
2022
internal expect fun disposeContinuation(cont: () -> Continuation<*>)
2123
internal expect fun <T> CancellableContinuationImpl<T>.shareableResume(delegate: Continuation<T>, useMode: Int)
24+
25+
internal expect fun <T, R> (suspend (T) -> R).asShareable(): suspend (T) -> R
26+
internal expect fun <T, R> (suspend (T) -> R).shareableDispose(useIt: Boolean)
27+
internal expect fun <T, R> (suspend (T) -> R).shareableWillBeUsed()
28+
2229
internal expect fun isReuseSupportedInPlatform(): Boolean
2330
internal expect fun <T> ArrayList<T>.addOrUpdate(element: T, update: (ArrayList<T>) -> Unit)
2431
internal expect fun <T> ArrayList<T>.addOrUpdate(index: Int, element: T, update: (ArrayList<T>) -> Unit)

0 commit comments

Comments
 (0)